blob: 8bb74b438895002c59879801a17689a61ede8500 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimesdf6ac7e2019-09-26 17:02:59 +020022import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070023try:
24 import ctypes
25except ImportError:
26 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000027
Antoine Pitrou05d936d2010-10-13 11:38:36 +000028ssl = support.import_module("ssl")
29
Victor Stinner8f4ef3b2019-07-01 18:28:25 +020030from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000031
Paul Monsonf3550692019-06-19 13:09:54 -070032Py_DEBUG = hasattr(sys, 'gettotalrefcount')
33Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
34
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010035PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000036HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020037IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010038IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
39IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010040PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000041
Victor Stinner3ef63442019-02-19 18:06:03 +010042PROTOCOL_TO_TLS_VERSION = {}
43for proto, ver in (
44 ("PROTOCOL_SSLv23", "SSLv3"),
45 ("PROTOCOL_TLSv1", "TLSv1"),
46 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
47):
48 try:
49 proto = getattr(ssl, proto)
50 ver = getattr(ssl.TLSVersion, ver)
51 except AttributeError:
52 continue
53 PROTOCOL_TO_TLS_VERSION[proto] = ver
54
Christian Heimesefff7062013-11-21 03:35:02 +010055def data_file(*name):
56 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000057
Antoine Pitrou81564092010-10-08 23:06:24 +000058# The custom key and certificate files used in test_ssl are generated
59# using Lib/test/make_ssl_certs.py.
60# Other certificates are simply fetched from the Internet servers they
61# are meant to authenticate.
62
Antoine Pitrou152efa22010-05-16 18:19:27 +000063CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000064BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000065ONLYCERT = data_file("ssl_cert.pem")
66ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000067BYTES_ONLYCERT = os.fsencode(ONLYCERT)
68BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020069CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
70ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
71KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000072CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000073BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010074CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
75CAFILE_CACERT = data_file("capath", "5ed36f99.0")
76
Christian Heimesbd5c7d22018-01-20 15:16:30 +010077CERTFILE_INFO = {
78 'issuer': ((('countryName', 'XY'),),
79 (('localityName', 'Castle Anthrax'),),
80 (('organizationName', 'Python Software Foundation'),),
81 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020082 'notAfter': 'Aug 26 14:23:15 2028 GMT',
83 'notBefore': 'Aug 29 14:23:15 2018 GMT',
84 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010085 'subject': ((('countryName', 'XY'),),
86 (('localityName', 'Castle Anthrax'),),
87 (('organizationName', 'Python Software Foundation'),),
88 (('commonName', 'localhost'),)),
89 'subjectAltName': (('DNS', 'localhost'),),
90 'version': 3
91}
Antoine Pitrou152efa22010-05-16 18:19:27 +000092
Christian Heimes22587792013-11-21 23:56:13 +010093# empty CRL
94CRLFILE = data_file("revocation.crl")
95
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010096# Two keys and certs signed by the same CA (for SNI tests)
97SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020098SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010099
100SIGNED_CERTFILE_INFO = {
101 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
102 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
103 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
104 'issuer': ((('countryName', 'XY'),),
105 (('organizationName', 'Python Software Foundation CA'),),
106 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200107 'notAfter': 'Jul 7 14:23:16 2028 GMT',
108 'notBefore': 'Aug 29 14:23:16 2018 GMT',
109 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100110 'subject': ((('countryName', 'XY'),),
111 (('localityName', 'Castle Anthrax'),),
112 (('organizationName', 'Python Software Foundation'),),
113 (('commonName', 'localhost'),)),
114 'subjectAltName': (('DNS', 'localhost'),),
115 'version': 3
116}
117
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100118SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200119SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100120SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
121SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
122
Martin Panter3840b2a2016-03-27 01:53:46 +0000123# Same certificate as pycacert.pem, but without extra text in file
124SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200125# cert with all kinds of subject alt names
126ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100127IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100128
Martin Panter3d81d932016-01-14 09:36:00 +0000129REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000130
131EMPTYCERT = data_file("nullcert.pem")
132BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000133NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000134BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200135NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200136NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100137TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000138
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200139DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100140BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000141
Christian Heimes358cfd42016-09-10 22:43:48 +0200142# Not defined in all versions of OpenSSL
143OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
144OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
145OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
146OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100147OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200148
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100149
Christian Heimesdf6ac7e2019-09-26 17:02:59 +0200150def has_tls_protocol(protocol):
151 """Check if a TLS protocol is available and enabled
152
153 :param protocol: enum ssl._SSLMethod member or name
154 :return: bool
155 """
156 if isinstance(protocol, str):
157 assert protocol.startswith('PROTOCOL_')
158 protocol = getattr(ssl, protocol, None)
159 if protocol is None:
160 return False
161 if protocol in {
162 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER,
163 ssl.PROTOCOL_TLS_CLIENT
164 }:
165 # auto-negotiate protocols are always available
166 return True
167 name = protocol.name
168 return has_tls_version(name[len('PROTOCOL_'):])
169
170
171@functools.lru_cache
172def has_tls_version(version):
173 """Check if a TLS/SSL version is enabled
174
175 :param version: TLS version name or ssl.TLSVersion member
176 :return: bool
177 """
178 if version == "SSLv2":
179 # never supported and not even in TLSVersion enum
180 return False
181
182 if isinstance(version, str):
183 version = ssl.TLSVersion.__members__[version]
184
185 # check compile time flags like ssl.HAS_TLSv1_2
186 if not getattr(ssl, f'HAS_{version.name}'):
187 return False
188
189 # check runtime and dynamic crypto policy settings. A TLS version may
190 # be compiled in but disabled by a policy or config option.
191 ctx = ssl.SSLContext()
192 if (
193 ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
194 version < ctx.minimum_version
195 ):
196 return False
197 if (
198 ctx.maximum_version != ssl.TLSVersion.MAXIMUM_SUPPORTED and
199 version > ctx.maximum_version
200 ):
201 return False
202
203 return True
204
205
206def requires_tls_version(version):
207 """Decorator to skip tests when a required TLS version is not available
208
209 :param version: TLS version name or ssl.TLSVersion member
210 :return:
211 """
212 def decorator(func):
213 @functools.wraps(func)
214 def wrapper(*args, **kw):
215 if not has_tls_version(version):
216 raise unittest.SkipTest(f"{version} is not available.")
217 else:
218 return func(*args, **kw)
219 return wrapper
220 return decorator
221
222
223requires_minimum_version = unittest.skipUnless(
224 hasattr(ssl.SSLContext, 'minimum_version'),
225 "required OpenSSL >= 1.1.0g"
226)
227
228
Thomas Woutersed03b412007-08-28 21:37:11 +0000229def handle_error(prefix):
230 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000231 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000232 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000233
Antoine Pitroub5218772010-05-21 09:56:06 +0000234def can_clear_options():
235 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200236 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000237
238def no_sslv2_implies_sslv3_hello():
239 # 0.9.7h or higher
240 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
241
Christian Heimes2427b502013-11-23 11:24:32 +0100242def have_verify_flags():
243 # 0.9.8 or higher
244 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
245
Christian Heimesb7b92252018-02-25 09:49:31 +0100246def _have_secp_curves():
247 if not ssl.HAS_ECDH:
248 return False
249 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
250 try:
251 ctx.set_ecdh_curve("secp384r1")
252 except ValueError:
253 return False
254 else:
255 return True
256
257
258HAVE_SECP_CURVES = _have_secp_curves()
259
260
Antoine Pitrouc695c952014-04-28 20:57:36 +0200261def utc_offset(): #NOTE: ignore issues like #1647654
262 # local time = utc time + utc offset
263 if time.daylight and time.localtime().tm_isdst > 0:
264 return -time.altzone # seconds
265 return -time.timezone
266
Christian Heimes9424bb42013-06-17 15:32:57 +0200267def asn1time(cert_time):
268 # Some versions of OpenSSL ignore seconds, see #18207
269 # 0.9.8.i
270 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
271 fmt = "%b %d %H:%M:%S %Y GMT"
272 dt = datetime.datetime.strptime(cert_time, fmt)
273 dt = dt.replace(second=0)
274 cert_time = dt.strftime(fmt)
275 # %d adds leading zero but ASN1_TIME_print() uses leading space
276 if cert_time[4] == "0":
277 cert_time = cert_time[:4] + " " + cert_time[5:]
278
279 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000280
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100281needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
282
Antoine Pitrou23df4832010-08-04 17:14:06 +0000283
Christian Heimesd0486372016-09-10 23:23:33 +0200284def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
285 cert_reqs=ssl.CERT_NONE, ca_certs=None,
286 ciphers=None, certfile=None, keyfile=None,
287 **kwargs):
288 context = ssl.SSLContext(ssl_version)
289 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200290 if cert_reqs == ssl.CERT_NONE:
291 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200292 context.verify_mode = cert_reqs
293 if ca_certs is not None:
294 context.load_verify_locations(ca_certs)
295 if certfile is not None or keyfile is not None:
296 context.load_cert_chain(certfile, keyfile)
297 if ciphers is not None:
298 context.set_ciphers(ciphers)
299 return context.wrap_socket(sock, **kwargs)
300
Christian Heimesa170fa12017-09-15 20:27:30 +0200301
302def testing_context(server_cert=SIGNED_CERTFILE):
303 """Create context
304
305 client_context, server_context, hostname = testing_context()
306 """
307 if server_cert == SIGNED_CERTFILE:
308 hostname = SIGNED_CERTFILE_HOSTNAME
309 elif server_cert == SIGNED_CERTFILE2:
310 hostname = SIGNED_CERTFILE2_HOSTNAME
311 else:
312 raise ValueError(server_cert)
313
314 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
315 client_context.load_verify_locations(SIGNING_CA)
316
317 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
318 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200319 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200320
321 return client_context, server_context, hostname
322
323
Antoine Pitrou152efa22010-05-16 18:19:27 +0000324class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000325
Antoine Pitrou480a1242010-04-28 21:37:09 +0000326 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000327 ssl.CERT_NONE
328 ssl.CERT_OPTIONAL
329 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100330 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100331 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100332 if ssl.HAS_ECDH:
333 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100334 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
335 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000336 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100337 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700338 ssl.OP_NO_SSLv2
339 ssl.OP_NO_SSLv3
340 ssl.OP_NO_TLSv1
341 ssl.OP_NO_TLSv1_3
342 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
343 ssl.OP_NO_TLSv1_1
344 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200345 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000346
Christian Heimes9d50ab52018-02-27 10:17:30 +0100347 def test_private_init(self):
348 with self.assertRaisesRegex(TypeError, "public constructor"):
349 with socket.socket() as s:
350 ssl.SSLSocket(s)
351
Antoine Pitrou172f0252014-04-18 20:33:08 +0200352 def test_str_for_enums(self):
353 # Make sure that the PROTOCOL_* constants have enum-like string
354 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200355 proto = ssl.PROTOCOL_TLS
356 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200357 ctx = ssl.SSLContext(proto)
358 self.assertIs(ctx.protocol, proto)
359
Antoine Pitrou480a1242010-04-28 21:37:09 +0000360 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000361 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000362 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000363 sys.stdout.write("\n RAND_status is %d (%s)\n"
364 % (v, (v and "sufficient randomness") or
365 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200366
367 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
368 self.assertEqual(len(data), 16)
369 self.assertEqual(is_cryptographic, v == 1)
370 if v:
371 data = ssl.RAND_bytes(16)
372 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200373 else:
374 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200375
Victor Stinner1e81a392013-12-19 16:47:04 +0100376 # negative num is invalid
377 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
378 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
379
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100380 if hasattr(ssl, 'RAND_egd'):
381 self.assertRaises(TypeError, ssl.RAND_egd, 1)
382 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000383 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200384 ssl.RAND_add(b"this is a random bytes object", 75.0)
385 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000386
Christian Heimesf77b4b22013-08-21 13:26:05 +0200387 @unittest.skipUnless(os.name == 'posix', 'requires posix')
388 def test_random_fork(self):
389 status = ssl.RAND_status()
390 if not status:
391 self.fail("OpenSSL's PRNG has insufficient randomness")
392
393 rfd, wfd = os.pipe()
394 pid = os.fork()
395 if pid == 0:
396 try:
397 os.close(rfd)
398 child_random = ssl.RAND_pseudo_bytes(16)[0]
399 self.assertEqual(len(child_random), 16)
400 os.write(wfd, child_random)
401 os.close(wfd)
402 except BaseException:
403 os._exit(1)
404 else:
405 os._exit(0)
406 else:
407 os.close(wfd)
408 self.addCleanup(os.close, rfd)
409 _, status = os.waitpid(pid, 0)
410 self.assertEqual(status, 0)
411
412 child_random = os.read(rfd, 16)
413 self.assertEqual(len(child_random), 16)
414 parent_random = ssl.RAND_pseudo_bytes(16)[0]
415 self.assertEqual(len(parent_random), 16)
416
417 self.assertNotEqual(child_random, parent_random)
418
Christian Heimese6dac002018-08-30 07:25:49 +0200419 maxDiff = None
420
Antoine Pitrou480a1242010-04-28 21:37:09 +0000421 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000422 # note that this uses an 'unofficial' function in _ssl.c,
423 # provided solely for this test, to exercise the certificate
424 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100425 self.assertEqual(
426 ssl._ssl._test_decode_cert(CERTFILE),
427 CERTFILE_INFO
428 )
429 self.assertEqual(
430 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
431 SIGNED_CERTFILE_INFO
432 )
433
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200434 # Issue #13034: the subjectAltName in some certificates
435 # (notably projects.developer.nokia.com:443) wasn't parsed
436 p = ssl._ssl._test_decode_cert(NOKIACERT)
437 if support.verbose:
438 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
439 self.assertEqual(p['subjectAltName'],
440 (('DNS', 'projects.developer.nokia.com'),
441 ('DNS', 'projects.forum.nokia.com'))
442 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100443 # extra OCSP and AIA fields
444 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
445 self.assertEqual(p['caIssuers'],
446 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
447 self.assertEqual(p['crlDistributionPoints'],
448 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000449
Christian Heimesa37f5242019-01-15 23:47:42 +0100450 def test_parse_cert_CVE_2019_5010(self):
451 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
452 if support.verbose:
453 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
454 self.assertEqual(
455 p,
456 {
457 'issuer': (
458 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
459 'notAfter': 'Jun 14 18:00:58 2028 GMT',
460 'notBefore': 'Jun 18 18:00:58 2018 GMT',
461 'serialNumber': '02',
462 'subject': ((('countryName', 'UK'),),
463 (('commonName',
464 'codenomicon-vm-2.test.lal.cisco.com'),)),
465 'subjectAltName': (
466 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
467 'version': 3
468 }
469 )
470
Christian Heimes824f7f32013-08-17 00:54:47 +0200471 def test_parse_cert_CVE_2013_4238(self):
472 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
473 if support.verbose:
474 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
475 subject = ((('countryName', 'US'),),
476 (('stateOrProvinceName', 'Oregon'),),
477 (('localityName', 'Beaverton'),),
478 (('organizationName', 'Python Software Foundation'),),
479 (('organizationalUnitName', 'Python Core Development'),),
480 (('commonName', 'null.python.org\x00example.org'),),
481 (('emailAddress', 'python-dev@python.org'),))
482 self.assertEqual(p['subject'], subject)
483 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200484 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
485 san = (('DNS', 'altnull.python.org\x00example.com'),
486 ('email', 'null@python.org\x00user@example.org'),
487 ('URI', 'http://null.python.org\x00http://example.org'),
488 ('IP Address', '192.0.2.1'),
489 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
490 else:
491 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
492 san = (('DNS', 'altnull.python.org\x00example.com'),
493 ('email', 'null@python.org\x00user@example.org'),
494 ('URI', 'http://null.python.org\x00http://example.org'),
495 ('IP Address', '192.0.2.1'),
496 ('IP Address', '<invalid>'))
497
498 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200499
Christian Heimes1c03abd2016-09-06 23:25:35 +0200500 def test_parse_all_sans(self):
501 p = ssl._ssl._test_decode_cert(ALLSANFILE)
502 self.assertEqual(p['subjectAltName'],
503 (
504 ('DNS', 'allsans'),
505 ('othername', '<unsupported>'),
506 ('othername', '<unsupported>'),
507 ('email', 'user@example.org'),
508 ('DNS', 'www.example.org'),
509 ('DirName',
510 ((('countryName', 'XY'),),
511 (('localityName', 'Castle Anthrax'),),
512 (('organizationName', 'Python Software Foundation'),),
513 (('commonName', 'dirname example'),))),
514 ('URI', 'https://www.python.org/'),
515 ('IP Address', '127.0.0.1'),
516 ('IP Address', '0:0:0:0:0:0:0:1\n'),
517 ('Registered ID', '1.2.3.4.5')
518 )
519 )
520
Antoine Pitrou480a1242010-04-28 21:37:09 +0000521 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000522 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000523 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000524 d1 = ssl.PEM_cert_to_DER_cert(pem)
525 p2 = ssl.DER_cert_to_PEM_cert(d1)
526 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000527 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000528 if not p2.startswith(ssl.PEM_HEADER + '\n'):
529 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
530 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
531 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000532
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000533 def test_openssl_version(self):
534 n = ssl.OPENSSL_VERSION_NUMBER
535 t = ssl.OPENSSL_VERSION_INFO
536 s = ssl.OPENSSL_VERSION
537 self.assertIsInstance(n, int)
538 self.assertIsInstance(t, tuple)
539 self.assertIsInstance(s, str)
540 # Some sanity checks follow
541 # >= 0.9
542 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400543 # < 3.0
544 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000545 major, minor, fix, patch, status = t
546 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400547 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000548 self.assertGreaterEqual(minor, 0)
549 self.assertLess(minor, 256)
550 self.assertGreaterEqual(fix, 0)
551 self.assertLess(fix, 256)
552 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100553 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000554 self.assertGreaterEqual(status, 0)
555 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400556 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200557 if IS_LIBRESSL:
558 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100559 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400560 else:
561 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100562 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000563
Antoine Pitrou9d543662010-04-23 23:10:32 +0000564 @support.cpython_only
565 def test_refcycle(self):
566 # Issue #7943: an SSL object doesn't create reference cycles with
567 # itself.
568 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200569 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000570 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100571 with support.check_warnings(("", ResourceWarning)):
572 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100573 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000574
Antoine Pitroua468adc2010-09-14 14:43:44 +0000575 def test_wrapped_unconnected(self):
576 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200577 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000578 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200579 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100580 self.assertRaises(OSError, ss.recv, 1)
581 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
582 self.assertRaises(OSError, ss.recvfrom, 1)
583 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
584 self.assertRaises(OSError, ss.send, b'x')
585 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200586 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100587 self.assertRaises(NotImplementedError, ss.sendmsg,
588 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200589 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
590 self.assertRaises(NotImplementedError, ss.recvmsg_into,
591 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000592
Antoine Pitrou40f08742010-04-24 22:04:40 +0000593 def test_timeout(self):
594 # Issue #8524: when creating an SSL socket, the timeout of the
595 # original socket should be retained.
596 for timeout in (None, 0.0, 5.0):
597 s = socket.socket(socket.AF_INET)
598 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200599 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100600 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000601
Christian Heimesd0486372016-09-10 23:23:33 +0200602 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000603 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000604 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000605 "certfile must be specified",
606 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000607 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000608 "certfile must be specified for server-side operations",
609 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000610 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000611 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200612 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100613 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
614 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200615 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200616 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000617 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000618 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000619 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200620 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000621 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000622 ssl.wrap_socket(sock,
623 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000624 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200625 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000626 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000627 ssl.wrap_socket(sock,
628 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000629 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000630
Martin Panter3464ea22016-02-01 21:58:11 +0000631 def bad_cert_test(self, certfile):
632 """Check that trying to use the given client certificate fails"""
633 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
634 certfile)
635 sock = socket.socket()
636 self.addCleanup(sock.close)
637 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200638 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200639 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000640
641 def test_empty_cert(self):
642 """Wrapping with an empty cert file"""
643 self.bad_cert_test("nullcert.pem")
644
645 def test_malformed_cert(self):
646 """Wrapping with a badly formatted certificate (syntax error)"""
647 self.bad_cert_test("badcert.pem")
648
649 def test_malformed_key(self):
650 """Wrapping with a badly formatted key (syntax error)"""
651 self.bad_cert_test("badkey.pem")
652
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000653 def test_match_hostname(self):
654 def ok(cert, hostname):
655 ssl.match_hostname(cert, hostname)
656 def fail(cert, hostname):
657 self.assertRaises(ssl.CertificateError,
658 ssl.match_hostname, cert, hostname)
659
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100660 # -- Hostname matching --
661
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000662 cert = {'subject': ((('commonName', 'example.com'),),)}
663 ok(cert, 'example.com')
664 ok(cert, 'ExAmple.cOm')
665 fail(cert, 'www.example.com')
666 fail(cert, '.example.com')
667 fail(cert, 'example.org')
668 fail(cert, 'exampleXcom')
669
670 cert = {'subject': ((('commonName', '*.a.com'),),)}
671 ok(cert, 'foo.a.com')
672 fail(cert, 'bar.foo.a.com')
673 fail(cert, 'a.com')
674 fail(cert, 'Xa.com')
675 fail(cert, '.a.com')
676
Mandeep Singhede2ac92017-11-27 04:01:27 +0530677 # only match wildcards when they are the only thing
678 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000679 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530680 fail(cert, 'foo.com')
681 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000682 fail(cert, 'bar.com')
683 fail(cert, 'foo.a.com')
684 fail(cert, 'bar.foo.com')
685
Christian Heimes824f7f32013-08-17 00:54:47 +0200686 # NULL bytes are bad, CVE-2013-4073
687 cert = {'subject': ((('commonName',
688 'null.python.org\x00example.org'),),)}
689 ok(cert, 'null.python.org\x00example.org') # or raise an error?
690 fail(cert, 'example.org')
691 fail(cert, 'null.python.org')
692
Georg Brandl72c98d32013-10-27 07:16:53 +0100693 # error cases with wildcards
694 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
695 fail(cert, 'bar.foo.a.com')
696 fail(cert, 'a.com')
697 fail(cert, 'Xa.com')
698 fail(cert, '.a.com')
699
700 cert = {'subject': ((('commonName', 'a.*.com'),),)}
701 fail(cert, 'a.foo.com')
702 fail(cert, 'a..com')
703 fail(cert, 'a.com')
704
705 # wildcard doesn't match IDNA prefix 'xn--'
706 idna = 'püthon.python.org'.encode("idna").decode("ascii")
707 cert = {'subject': ((('commonName', idna),),)}
708 ok(cert, idna)
709 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
710 fail(cert, idna)
711 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
712 fail(cert, idna)
713
714 # wildcard in first fragment and IDNA A-labels in sequent fragments
715 # are supported.
716 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
717 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530718 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
719 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100720 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
721 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
722
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000723 # Slightly fake real-world example
724 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
725 'subject': ((('commonName', 'linuxfrz.org'),),),
726 'subjectAltName': (('DNS', 'linuxfr.org'),
727 ('DNS', 'linuxfr.com'),
728 ('othername', '<unsupported>'))}
729 ok(cert, 'linuxfr.org')
730 ok(cert, 'linuxfr.com')
731 # Not a "DNS" entry
732 fail(cert, '<unsupported>')
733 # When there is a subjectAltName, commonName isn't used
734 fail(cert, 'linuxfrz.org')
735
736 # A pristine real-world example
737 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
738 'subject': ((('countryName', 'US'),),
739 (('stateOrProvinceName', 'California'),),
740 (('localityName', 'Mountain View'),),
741 (('organizationName', 'Google Inc'),),
742 (('commonName', 'mail.google.com'),))}
743 ok(cert, 'mail.google.com')
744 fail(cert, 'gmail.com')
745 # Only commonName is considered
746 fail(cert, 'California')
747
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100748 # -- IPv4 matching --
749 cert = {'subject': ((('commonName', 'example.com'),),),
750 'subjectAltName': (('DNS', 'example.com'),
751 ('IP Address', '10.11.12.13'),
Christian Heimes477b1b22019-07-02 20:39:42 +0200752 ('IP Address', '14.15.16.17'),
753 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100754 ok(cert, '10.11.12.13')
755 ok(cert, '14.15.16.17')
Christian Heimes477b1b22019-07-02 20:39:42 +0200756 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
757 fail(cert, '127.1')
758 fail(cert, '14.15.16.17 ')
759 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100760 fail(cert, '14.15.16.18')
761 fail(cert, 'example.net')
762
763 # -- IPv6 matching --
Zackery Spytzc2cda632019-06-30 09:24:43 -0600764 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100765 cert = {'subject': ((('commonName', 'example.com'),),),
766 'subjectAltName': (
767 ('DNS', 'example.com'),
768 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
769 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
770 ok(cert, '2001::cafe')
771 ok(cert, '2003::baba')
Christian Heimes477b1b22019-07-02 20:39:42 +0200772 fail(cert, '2003::baba ')
773 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100774 fail(cert, '2003::bebe')
775 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100776
777 # -- Miscellaneous --
778
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000779 # Neither commonName nor subjectAltName
780 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
781 'subject': ((('countryName', 'US'),),
782 (('stateOrProvinceName', 'California'),),
783 (('localityName', 'Mountain View'),),
784 (('organizationName', 'Google Inc'),))}
785 fail(cert, 'mail.google.com')
786
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200787 # No DNS entry in subjectAltName but a commonName
788 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
789 'subject': ((('countryName', 'US'),),
790 (('stateOrProvinceName', 'California'),),
791 (('localityName', 'Mountain View'),),
792 (('commonName', 'mail.google.com'),)),
793 'subjectAltName': (('othername', 'blabla'), )}
794 ok(cert, 'mail.google.com')
795
796 # No DNS entry subjectAltName and no commonName
797 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
798 'subject': ((('countryName', 'US'),),
799 (('stateOrProvinceName', 'California'),),
800 (('localityName', 'Mountain View'),),
801 (('organizationName', 'Google Inc'),)),
802 'subjectAltName': (('othername', 'blabla'),)}
803 fail(cert, 'google.com')
804
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000805 # Empty cert / no cert
806 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
807 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
808
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200809 # Issue #17980: avoid denials of service by refusing more than one
810 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100811 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
812 with self.assertRaisesRegex(
813 ssl.CertificateError,
814 "partial wildcards in leftmost label are not supported"):
815 ssl.match_hostname(cert, 'axxb.example.com')
816
817 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
818 with self.assertRaisesRegex(
819 ssl.CertificateError,
820 "wildcard can only be present in the leftmost label"):
821 ssl.match_hostname(cert, 'www.sub.example.com')
822
823 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
824 with self.assertRaisesRegex(
825 ssl.CertificateError,
826 "too many wildcards"):
827 ssl.match_hostname(cert, 'axxbxxc.example.com')
828
829 cert = {'subject': ((('commonName', '*'),),)}
830 with self.assertRaisesRegex(
831 ssl.CertificateError,
832 "sole wildcard without additional labels are not support"):
833 ssl.match_hostname(cert, 'host')
834
835 cert = {'subject': ((('commonName', '*.com'),),)}
836 with self.assertRaisesRegex(
837 ssl.CertificateError,
838 r"hostname 'com' doesn't match '\*.com'"):
839 ssl.match_hostname(cert, 'com')
840
841 # extra checks for _inet_paton()
842 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
843 with self.assertRaises(ValueError):
844 ssl._inet_paton(invalid)
845 for ipaddr in ['127.0.0.1', '192.168.0.1']:
846 self.assertTrue(ssl._inet_paton(ipaddr))
Zackery Spytzc2cda632019-06-30 09:24:43 -0600847 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100848 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
849 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200850
Antoine Pitroud5323212010-10-22 18:19:07 +0000851 def test_server_side(self):
852 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200853 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000854 with socket.socket() as sock:
855 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
856 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000857
Antoine Pitroud6494802011-07-21 01:11:30 +0200858 def test_unknown_channel_binding(self):
859 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200860 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200861 c = socket.socket(socket.AF_INET)
862 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200863 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100864 with self.assertRaises(ValueError):
865 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200866 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200867
868 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
869 "'tls-unique' channel binding not available")
870 def test_tls_unique_channel_binding(self):
871 # unconnected should return None for known type
872 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200873 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100874 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200875 # the same for server-side
876 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200877 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100878 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200879
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600880 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200881 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600882 r = repr(ss)
883 with self.assertWarns(ResourceWarning) as cm:
884 ss = None
885 support.gc_collect()
886 self.assertIn(r, str(cm.warning.args[0]))
887
Christian Heimes6d7ad132013-06-09 18:02:55 +0200888 def test_get_default_verify_paths(self):
889 paths = ssl.get_default_verify_paths()
890 self.assertEqual(len(paths), 6)
891 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
892
893 with support.EnvironmentVarGuard() as env:
894 env["SSL_CERT_DIR"] = CAPATH
895 env["SSL_CERT_FILE"] = CERTFILE
896 paths = ssl.get_default_verify_paths()
897 self.assertEqual(paths.cafile, CERTFILE)
898 self.assertEqual(paths.capath, CAPATH)
899
Christian Heimes44109d72013-11-22 01:51:30 +0100900 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
901 def test_enum_certificates(self):
902 self.assertTrue(ssl.enum_certificates("CA"))
903 self.assertTrue(ssl.enum_certificates("ROOT"))
904
905 self.assertRaises(TypeError, ssl.enum_certificates)
906 self.assertRaises(WindowsError, ssl.enum_certificates, "")
907
Christian Heimesc2d65e12013-11-22 16:13:55 +0100908 trust_oids = set()
909 for storename in ("CA", "ROOT"):
910 store = ssl.enum_certificates(storename)
911 self.assertIsInstance(store, list)
912 for element in store:
913 self.assertIsInstance(element, tuple)
914 self.assertEqual(len(element), 3)
915 cert, enc, trust = element
916 self.assertIsInstance(cert, bytes)
917 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Christian Heimes915cd3f2019-09-09 18:06:55 +0200918 self.assertIsInstance(trust, (frozenset, set, bool))
919 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100920 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100921
922 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100923 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200924
Christian Heimes46bebee2013-06-09 19:03:31 +0200925 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100926 def test_enum_crls(self):
927 self.assertTrue(ssl.enum_crls("CA"))
928 self.assertRaises(TypeError, ssl.enum_crls)
929 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200930
Christian Heimes44109d72013-11-22 01:51:30 +0100931 crls = ssl.enum_crls("CA")
932 self.assertIsInstance(crls, list)
933 for element in crls:
934 self.assertIsInstance(element, tuple)
935 self.assertEqual(len(element), 2)
936 self.assertIsInstance(element[0], bytes)
937 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200938
Christian Heimes46bebee2013-06-09 19:03:31 +0200939
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100940 def test_asn1object(self):
941 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
942 '1.3.6.1.5.5.7.3.1')
943
944 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
945 self.assertEqual(val, expected)
946 self.assertEqual(val.nid, 129)
947 self.assertEqual(val.shortname, 'serverAuth')
948 self.assertEqual(val.longname, 'TLS Web Server Authentication')
949 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
950 self.assertIsInstance(val, ssl._ASN1Object)
951 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
952
953 val = ssl._ASN1Object.fromnid(129)
954 self.assertEqual(val, expected)
955 self.assertIsInstance(val, ssl._ASN1Object)
956 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100957 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
958 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100959 for i in range(1000):
960 try:
961 obj = ssl._ASN1Object.fromnid(i)
962 except ValueError:
963 pass
964 else:
965 self.assertIsInstance(obj.nid, int)
966 self.assertIsInstance(obj.shortname, str)
967 self.assertIsInstance(obj.longname, str)
968 self.assertIsInstance(obj.oid, (str, type(None)))
969
970 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
971 self.assertEqual(val, expected)
972 self.assertIsInstance(val, ssl._ASN1Object)
973 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
974 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
975 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100976 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
977 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100978
Christian Heimes72d28502013-11-23 13:56:58 +0100979 def test_purpose_enum(self):
980 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
981 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
982 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
983 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
984 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
985 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
986 '1.3.6.1.5.5.7.3.1')
987
988 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
989 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
990 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
991 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
992 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
993 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
994 '1.3.6.1.5.5.7.3.2')
995
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100996 def test_unsupported_dtls(self):
997 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
998 self.addCleanup(s.close)
999 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +02001000 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001001 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +02001002 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +01001003 with self.assertRaises(NotImplementedError) as cx:
1004 ctx.wrap_socket(s)
1005 self.assertEqual(str(cx.exception), "only stream sockets are supported")
1006
Antoine Pitrouc695c952014-04-28 20:57:36 +02001007 def cert_time_ok(self, timestring, timestamp):
1008 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
1009
1010 def cert_time_fail(self, timestring):
1011 with self.assertRaises(ValueError):
1012 ssl.cert_time_to_seconds(timestring)
1013
1014 @unittest.skipUnless(utc_offset(),
1015 'local time needs to be different from UTC')
1016 def test_cert_time_to_seconds_timezone(self):
1017 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
1018 # results if local timezone is not UTC
1019 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
1020 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
1021
1022 def test_cert_time_to_seconds(self):
1023 timestring = "Jan 5 09:34:43 2018 GMT"
1024 ts = 1515144883.0
1025 self.cert_time_ok(timestring, ts)
1026 # accept keyword parameter, assert its name
1027 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
1028 # accept both %e and %d (space or zero generated by strftime)
1029 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
1030 # case-insensitive
1031 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
1032 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
1033 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
1034 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
1035 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
1036 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
1037 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
1038 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
1039
1040 newyear_ts = 1230768000.0
1041 # leap seconds
1042 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
1043 # same timestamp
1044 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
1045
1046 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
1047 # allow 60th second (even if it is not a leap second)
1048 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
1049 # allow 2nd leap second for compatibility with time.strptime()
1050 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
1051 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
1052
Mike53f7a7c2017-12-14 14:04:53 +03001053 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +02001054 # 99991231235959Z (rfc 5280)
1055 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
1056
1057 @support.run_with_locale('LC_ALL', '')
1058 def test_cert_time_to_seconds_locale(self):
1059 # `cert_time_to_seconds()` should be locale independent
1060
1061 def local_february_name():
1062 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
1063
1064 if local_february_name().lower() == 'feb':
1065 self.skipTest("locale-specific month name needs to be "
1066 "different from C locale")
1067
1068 # locale-independent
1069 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
1070 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
1071
Martin Panter3840b2a2016-03-27 01:53:46 +00001072 def test_connect_ex_error(self):
1073 server = socket.socket(socket.AF_INET)
1074 self.addCleanup(server.close)
1075 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +02001076 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001077 cert_reqs=ssl.CERT_REQUIRED)
1078 self.addCleanup(s.close)
1079 rc = s.connect_ex((HOST, port))
1080 # Issue #19919: Windows machines or VMs hosted on Windows
1081 # machines sometimes return EWOULDBLOCK.
1082 errors = (
1083 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1084 errno.EWOULDBLOCK,
1085 )
1086 self.assertIn(rc, errors)
1087
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001088
Antoine Pitrou152efa22010-05-16 18:19:27 +00001089class ContextTests(unittest.TestCase):
1090
1091 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001092 for protocol in PROTOCOLS:
1093 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001094 ctx = ssl.SSLContext()
1095 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001096 self.assertRaises(ValueError, ssl.SSLContext, -1)
1097 self.assertRaises(ValueError, ssl.SSLContext, 42)
1098
1099 def test_protocol(self):
1100 for proto in PROTOCOLS:
1101 ctx = ssl.SSLContext(proto)
1102 self.assertEqual(ctx.protocol, proto)
1103
1104 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001105 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001106 ctx.set_ciphers("ALL")
1107 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001108 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001109 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001110
Christian Heimes892d66e2018-01-29 14:10:18 +01001111 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1112 "Test applies only to Python default ciphers")
1113 def test_python_ciphers(self):
1114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1115 ciphers = ctx.get_ciphers()
1116 for suite in ciphers:
1117 name = suite['name']
1118 self.assertNotIn("PSK", name)
1119 self.assertNotIn("SRP", name)
1120 self.assertNotIn("MD5", name)
1121 self.assertNotIn("RC4", name)
1122 self.assertNotIn("3DES", name)
1123
Christian Heimes25bfcd52016-09-06 00:04:45 +02001124 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1125 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001126 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001127 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001128 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001129 self.assertIn('AES256-GCM-SHA384', names)
1130 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001131
Antoine Pitroub5218772010-05-21 09:56:06 +00001132 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001133 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001134 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001135 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001136 # SSLContext also enables these by default
1137 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001138 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1139 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001140 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001141 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001142 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001143 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001144 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1145 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001146 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001147 # Ubuntu has OP_NO_SSLv3 forced on by default
1148 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001149 else:
1150 with self.assertRaises(ValueError):
1151 ctx.options = 0
1152
Christian Heimesa170fa12017-09-15 20:27:30 +02001153 def test_verify_mode_protocol(self):
1154 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001155 # Default value
1156 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1157 ctx.verify_mode = ssl.CERT_OPTIONAL
1158 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1159 ctx.verify_mode = ssl.CERT_REQUIRED
1160 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1161 ctx.verify_mode = ssl.CERT_NONE
1162 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1163 with self.assertRaises(TypeError):
1164 ctx.verify_mode = None
1165 with self.assertRaises(ValueError):
1166 ctx.verify_mode = 42
1167
Christian Heimesa170fa12017-09-15 20:27:30 +02001168 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1169 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1170 self.assertFalse(ctx.check_hostname)
1171
1172 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1173 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1174 self.assertTrue(ctx.check_hostname)
1175
Christian Heimes61d478c2018-01-27 15:51:38 +01001176 def test_hostname_checks_common_name(self):
1177 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1178 self.assertTrue(ctx.hostname_checks_common_name)
1179 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1180 ctx.hostname_checks_common_name = True
1181 self.assertTrue(ctx.hostname_checks_common_name)
1182 ctx.hostname_checks_common_name = False
1183 self.assertFalse(ctx.hostname_checks_common_name)
1184 ctx.hostname_checks_common_name = True
1185 self.assertTrue(ctx.hostname_checks_common_name)
1186 else:
1187 with self.assertRaises(AttributeError):
1188 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001189
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001190 @requires_minimum_version
Christian Heimesc9bc49c2019-09-11 19:24:47 +02001191 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001192 def test_min_max_version(self):
1193 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001194 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1195 # Fedora override the setting to TLS 1.0.
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001196 minimum_range = {
1197 # stock OpenSSL
1198 ssl.TLSVersion.MINIMUM_SUPPORTED,
1199 # Fedora 29 uses TLS 1.0 by default
1200 ssl.TLSVersion.TLSv1,
1201 # RHEL 8 uses TLS 1.2 by default
1202 ssl.TLSVersion.TLSv1_2
1203 }
1204
Christian Heimes34de2d32019-01-18 16:09:30 +01001205 self.assertIn(
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001206 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001207 )
1208 self.assertEqual(
1209 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1210 )
1211
1212 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1213 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1214 self.assertEqual(
1215 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1216 )
1217 self.assertEqual(
1218 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1219 )
1220
1221 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1222 ctx.maximum_version = ssl.TLSVersion.TLSv1
1223 self.assertEqual(
1224 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1225 )
1226 self.assertEqual(
1227 ctx.maximum_version, ssl.TLSVersion.TLSv1
1228 )
1229
1230 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1231 self.assertEqual(
1232 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1233 )
1234
1235 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1236 self.assertIn(
1237 ctx.maximum_version,
1238 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1239 )
1240
1241 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1242 self.assertIn(
1243 ctx.minimum_version,
1244 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1245 )
1246
1247 with self.assertRaises(ValueError):
1248 ctx.minimum_version = 42
1249
1250 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1251
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02001252 self.assertIn(
1253 ctx.minimum_version, minimum_range
Christian Heimes698dde12018-02-27 11:54:43 +01001254 )
1255 self.assertEqual(
1256 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1257 )
1258 with self.assertRaises(ValueError):
1259 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1260 with self.assertRaises(ValueError):
1261 ctx.maximum_version = ssl.TLSVersion.TLSv1
1262
1263
Christian Heimes2427b502013-11-23 11:24:32 +01001264 @unittest.skipUnless(have_verify_flags(),
1265 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001266 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001267 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001268 # default value
1269 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1270 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001271 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1272 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1273 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1274 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1275 ctx.verify_flags = ssl.VERIFY_DEFAULT
1276 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1277 # supports any value
1278 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1279 self.assertEqual(ctx.verify_flags,
1280 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1281 with self.assertRaises(TypeError):
1282 ctx.verify_flags = None
1283
Antoine Pitrou152efa22010-05-16 18:19:27 +00001284 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001285 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001286 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001287 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001288 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1289 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001290 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001291 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001292 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001293 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001294 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001295 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001296 ctx.load_cert_chain(EMPTYCERT)
1297 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001298 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001299 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1300 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1301 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001302 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001303 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001304 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001305 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001306 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001307 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1308 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001310 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001311 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001312 # Password protected key and cert
1313 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1314 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1315 ctx.load_cert_chain(CERTFILE_PROTECTED,
1316 password=bytearray(KEY_PASSWORD.encode()))
1317 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1318 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1319 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1320 bytearray(KEY_PASSWORD.encode()))
1321 with self.assertRaisesRegex(TypeError, "should be a string"):
1322 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1323 with self.assertRaises(ssl.SSLError):
1324 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1325 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1326 # openssl has a fixed limit on the password buffer.
1327 # PEM_BUFSIZE is generally set to 1kb.
1328 # Return a string larger than this.
1329 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1330 # Password callback
1331 def getpass_unicode():
1332 return KEY_PASSWORD
1333 def getpass_bytes():
1334 return KEY_PASSWORD.encode()
1335 def getpass_bytearray():
1336 return bytearray(KEY_PASSWORD.encode())
1337 def getpass_badpass():
1338 return "badpass"
1339 def getpass_huge():
1340 return b'a' * (1024 * 1024)
1341 def getpass_bad_type():
1342 return 9
1343 def getpass_exception():
1344 raise Exception('getpass error')
1345 class GetPassCallable:
1346 def __call__(self):
1347 return KEY_PASSWORD
1348 def getpass(self):
1349 return KEY_PASSWORD
1350 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1351 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1352 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1353 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1354 ctx.load_cert_chain(CERTFILE_PROTECTED,
1355 password=GetPassCallable().getpass)
1356 with self.assertRaises(ssl.SSLError):
1357 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1358 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1359 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1360 with self.assertRaisesRegex(TypeError, "must return a string"):
1361 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1362 with self.assertRaisesRegex(Exception, "getpass error"):
1363 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1364 # Make sure the password function isn't called if it isn't needed
1365 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001366
1367 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001368 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001369 ctx.load_verify_locations(CERTFILE)
1370 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1371 ctx.load_verify_locations(BYTES_CERTFILE)
1372 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1373 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001374 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001375 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001376 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001377 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001378 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001379 ctx.load_verify_locations(BADCERT)
1380 ctx.load_verify_locations(CERTFILE, CAPATH)
1381 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1382
Victor Stinner80f75e62011-01-29 11:31:20 +00001383 # Issue #10989: crash if the second argument type is invalid
1384 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1385
Christian Heimesefff7062013-11-21 03:35:02 +01001386 def test_load_verify_cadata(self):
1387 # test cadata
1388 with open(CAFILE_CACERT) as f:
1389 cacert_pem = f.read()
1390 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1391 with open(CAFILE_NEURONIO) as f:
1392 neuronio_pem = f.read()
1393 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1394
1395 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001396 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001397 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1398 ctx.load_verify_locations(cadata=cacert_pem)
1399 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1400 ctx.load_verify_locations(cadata=neuronio_pem)
1401 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1402 # cert already in hash table
1403 ctx.load_verify_locations(cadata=neuronio_pem)
1404 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1405
1406 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001407 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001408 combined = "\n".join((cacert_pem, neuronio_pem))
1409 ctx.load_verify_locations(cadata=combined)
1410 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1411
1412 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001413 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001414 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1415 neuronio_pem, "tail"]
1416 ctx.load_verify_locations(cadata="\n".join(combined))
1417 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1418
1419 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001420 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001421 ctx.load_verify_locations(cadata=cacert_der)
1422 ctx.load_verify_locations(cadata=neuronio_der)
1423 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1424 # cert already in hash table
1425 ctx.load_verify_locations(cadata=cacert_der)
1426 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1427
1428 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001429 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001430 combined = b"".join((cacert_der, neuronio_der))
1431 ctx.load_verify_locations(cadata=combined)
1432 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1433
1434 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001435 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001436 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1437
1438 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1439 ctx.load_verify_locations(cadata="broken")
1440 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1441 ctx.load_verify_locations(cadata=b"broken")
1442
1443
Paul Monsonf3550692019-06-19 13:09:54 -07001444 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001445 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001446 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001447 ctx.load_dh_params(DHFILE)
1448 if os.name != 'nt':
1449 ctx.load_dh_params(BYTES_DHFILE)
1450 self.assertRaises(TypeError, ctx.load_dh_params)
1451 self.assertRaises(TypeError, ctx.load_dh_params, None)
1452 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001453 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001454 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001455 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001456 ctx.load_dh_params(CERTFILE)
1457
Antoine Pitroub0182c82010-10-12 20:09:02 +00001458 def test_session_stats(self):
1459 for proto in PROTOCOLS:
1460 ctx = ssl.SSLContext(proto)
1461 self.assertEqual(ctx.session_stats(), {
1462 'number': 0,
1463 'connect': 0,
1464 'connect_good': 0,
1465 'connect_renegotiate': 0,
1466 'accept': 0,
1467 'accept_good': 0,
1468 'accept_renegotiate': 0,
1469 'hits': 0,
1470 'misses': 0,
1471 'timeouts': 0,
1472 'cache_full': 0,
1473 })
1474
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001475 def test_set_default_verify_paths(self):
1476 # There's not much we can do to test that it acts as expected,
1477 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001479 ctx.set_default_verify_paths()
1480
Antoine Pitrou501da612011-12-21 09:27:41 +01001481 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001482 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001483 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001484 ctx.set_ecdh_curve("prime256v1")
1485 ctx.set_ecdh_curve(b"prime256v1")
1486 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1487 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1488 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1489 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1490
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001491 @needs_sni
1492 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001493 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001494
1495 # set_servername_callback expects a callable, or None
1496 self.assertRaises(TypeError, ctx.set_servername_callback)
1497 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1498 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1499 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1500
1501 def dummycallback(sock, servername, ctx):
1502 pass
1503 ctx.set_servername_callback(None)
1504 ctx.set_servername_callback(dummycallback)
1505
1506 @needs_sni
1507 def test_sni_callback_refcycle(self):
1508 # Reference cycles through the servername callback are detected
1509 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001510 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001511 def dummycallback(sock, servername, ctx, cycle=ctx):
1512 pass
1513 ctx.set_servername_callback(dummycallback)
1514 wr = weakref.ref(ctx)
1515 del ctx, dummycallback
1516 gc.collect()
1517 self.assertIs(wr(), None)
1518
Christian Heimes9a5395a2013-06-17 15:44:12 +02001519 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001520 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001521 self.assertEqual(ctx.cert_store_stats(),
1522 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1523 ctx.load_cert_chain(CERTFILE)
1524 self.assertEqual(ctx.cert_store_stats(),
1525 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1526 ctx.load_verify_locations(CERTFILE)
1527 self.assertEqual(ctx.cert_store_stats(),
1528 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001529 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001530 self.assertEqual(ctx.cert_store_stats(),
1531 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1532
1533 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001534 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001535 self.assertEqual(ctx.get_ca_certs(), [])
1536 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1537 ctx.load_verify_locations(CERTFILE)
1538 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001539 # but CAFILE_CACERT is a CA cert
1540 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001541 self.assertEqual(ctx.get_ca_certs(),
1542 [{'issuer': ((('organizationName', 'Root CA'),),
1543 (('organizationalUnitName', 'http://www.cacert.org'),),
1544 (('commonName', 'CA Cert Signing Authority'),),
1545 (('emailAddress', 'support@cacert.org'),)),
1546 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1547 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1548 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001549 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001550 'subject': ((('organizationName', 'Root CA'),),
1551 (('organizationalUnitName', 'http://www.cacert.org'),),
1552 (('commonName', 'CA Cert Signing Authority'),),
1553 (('emailAddress', 'support@cacert.org'),)),
1554 'version': 3}])
1555
Martin Panterb55f8b72016-01-14 12:53:56 +00001556 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001557 pem = f.read()
1558 der = ssl.PEM_cert_to_DER_cert(pem)
1559 self.assertEqual(ctx.get_ca_certs(True), [der])
1560
Christian Heimes72d28502013-11-23 13:56:58 +01001561 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001562 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001563 ctx.load_default_certs()
1564
Christian Heimesa170fa12017-09-15 20:27:30 +02001565 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001566 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1567 ctx.load_default_certs()
1568
Christian Heimesa170fa12017-09-15 20:27:30 +02001569 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001570 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1571
Christian Heimesa170fa12017-09-15 20:27:30 +02001572 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001573 self.assertRaises(TypeError, ctx.load_default_certs, None)
1574 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1575
Benjamin Peterson91244e02014-10-03 18:17:15 -04001576 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001577 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001578 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001579 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001580 with support.EnvironmentVarGuard() as env:
1581 env["SSL_CERT_DIR"] = CAPATH
1582 env["SSL_CERT_FILE"] = CERTFILE
1583 ctx.load_default_certs()
1584 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1585
Benjamin Peterson91244e02014-10-03 18:17:15 -04001586 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001587 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001588 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001589 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001590 ctx.load_default_certs()
1591 stats = ctx.cert_store_stats()
1592
Christian Heimesa170fa12017-09-15 20:27:30 +02001593 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001594 with support.EnvironmentVarGuard() as env:
1595 env["SSL_CERT_DIR"] = CAPATH
1596 env["SSL_CERT_FILE"] = CERTFILE
1597 ctx.load_default_certs()
1598 stats["x509"] += 1
1599 self.assertEqual(ctx.cert_store_stats(), stats)
1600
Christian Heimes358cfd42016-09-10 22:43:48 +02001601 def _assert_context_options(self, ctx):
1602 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1603 if OP_NO_COMPRESSION != 0:
1604 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1605 OP_NO_COMPRESSION)
1606 if OP_SINGLE_DH_USE != 0:
1607 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1608 OP_SINGLE_DH_USE)
1609 if OP_SINGLE_ECDH_USE != 0:
1610 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1611 OP_SINGLE_ECDH_USE)
1612 if OP_CIPHER_SERVER_PREFERENCE != 0:
1613 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1614 OP_CIPHER_SERVER_PREFERENCE)
1615
Christian Heimes4c05b472013-11-23 15:58:30 +01001616 def test_create_default_context(self):
1617 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001618
Christian Heimesa170fa12017-09-15 20:27:30 +02001619 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001620 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001621 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001622 self._assert_context_options(ctx)
1623
Christian Heimes4c05b472013-11-23 15:58:30 +01001624 with open(SIGNING_CA) as f:
1625 cadata = f.read()
1626 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1627 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001628 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001629 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001630 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001631
1632 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001633 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001634 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001635 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001636
Christian Heimes67986f92013-11-23 22:43:47 +01001637 def test__create_stdlib_context(self):
1638 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001639 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001640 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001641 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001642 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001643
1644 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1645 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1646 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001647 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001648
1649 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001650 cert_reqs=ssl.CERT_REQUIRED,
1651 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001652 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1653 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001654 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001655 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001656
1657 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001658 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001659 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001660 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001661
Christian Heimes1aa9a752013-12-02 02:41:19 +01001662 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001663 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001664 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001665 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001666
Christian Heimese82c0342017-09-15 20:29:57 +02001667 # Auto set CERT_REQUIRED
1668 ctx.check_hostname = True
1669 self.assertTrue(ctx.check_hostname)
1670 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1671 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001672 ctx.verify_mode = ssl.CERT_REQUIRED
1673 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001674 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001675
Christian Heimese82c0342017-09-15 20:29:57 +02001676 # Changing verify_mode does not affect check_hostname
1677 ctx.check_hostname = False
1678 ctx.verify_mode = ssl.CERT_NONE
1679 ctx.check_hostname = False
1680 self.assertFalse(ctx.check_hostname)
1681 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1682 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001683 ctx.check_hostname = True
1684 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001685 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1686
1687 ctx.check_hostname = False
1688 ctx.verify_mode = ssl.CERT_OPTIONAL
1689 ctx.check_hostname = False
1690 self.assertFalse(ctx.check_hostname)
1691 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1692 # keep CERT_OPTIONAL
1693 ctx.check_hostname = True
1694 self.assertTrue(ctx.check_hostname)
1695 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001696
1697 # Cannot set CERT_NONE with check_hostname enabled
1698 with self.assertRaises(ValueError):
1699 ctx.verify_mode = ssl.CERT_NONE
1700 ctx.check_hostname = False
1701 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001702 ctx.verify_mode = ssl.CERT_NONE
1703 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001704
Christian Heimes5fe668c2016-09-12 00:01:11 +02001705 def test_context_client_server(self):
1706 # PROTOCOL_TLS_CLIENT has sane defaults
1707 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1708 self.assertTrue(ctx.check_hostname)
1709 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1710
1711 # PROTOCOL_TLS_SERVER has different but also sane defaults
1712 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1713 self.assertFalse(ctx.check_hostname)
1714 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1715
Christian Heimes4df60f12017-09-15 20:26:05 +02001716 def test_context_custom_class(self):
1717 class MySSLSocket(ssl.SSLSocket):
1718 pass
1719
1720 class MySSLObject(ssl.SSLObject):
1721 pass
1722
1723 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1724 ctx.sslsocket_class = MySSLSocket
1725 ctx.sslobject_class = MySSLObject
1726
1727 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1728 self.assertIsInstance(sock, MySSLSocket)
1729 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1730 self.assertIsInstance(obj, MySSLObject)
1731
Christian Heimes78c7d522019-06-03 21:00:10 +02001732 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1733 def test_num_tickest(self):
1734 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1735 self.assertEqual(ctx.num_tickets, 2)
1736 ctx.num_tickets = 1
1737 self.assertEqual(ctx.num_tickets, 1)
1738 ctx.num_tickets = 0
1739 self.assertEqual(ctx.num_tickets, 0)
1740 with self.assertRaises(ValueError):
1741 ctx.num_tickets = -1
1742 with self.assertRaises(TypeError):
1743 ctx.num_tickets = None
1744
1745 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1746 self.assertEqual(ctx.num_tickets, 2)
1747 with self.assertRaises(ValueError):
1748 ctx.num_tickets = 1
1749
Antoine Pitrou152efa22010-05-16 18:19:27 +00001750
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001751class SSLErrorTests(unittest.TestCase):
1752
1753 def test_str(self):
1754 # The str() of a SSLError doesn't include the errno
1755 e = ssl.SSLError(1, "foo")
1756 self.assertEqual(str(e), "foo")
1757 self.assertEqual(e.errno, 1)
1758 # Same for a subclass
1759 e = ssl.SSLZeroReturnError(1, "foo")
1760 self.assertEqual(str(e), "foo")
1761 self.assertEqual(e.errno, 1)
1762
Paul Monsonf3550692019-06-19 13:09:54 -07001763 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001764 def test_lib_reason(self):
1765 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001766 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001767 with self.assertRaises(ssl.SSLError) as cm:
1768 ctx.load_dh_params(CERTFILE)
1769 self.assertEqual(cm.exception.library, 'PEM')
1770 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1771 s = str(cm.exception)
1772 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1773
1774 def test_subclass(self):
1775 # Check that the appropriate SSLError subclass is raised
1776 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001777 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1778 ctx.check_hostname = False
1779 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001780 with socket.create_server(("127.0.0.1", 0)) as s:
1781 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001782 c.setblocking(False)
1783 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001784 with self.assertRaises(ssl.SSLWantReadError) as cm:
1785 c.do_handshake()
1786 s = str(cm.exception)
1787 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1788 # For compatibility
1789 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1790
1791
Christian Heimes61d478c2018-01-27 15:51:38 +01001792 def test_bad_server_hostname(self):
1793 ctx = ssl.create_default_context()
1794 with self.assertRaises(ValueError):
1795 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1796 server_hostname="")
1797 with self.assertRaises(ValueError):
1798 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1799 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001800 with self.assertRaises(TypeError):
1801 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1802 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001803
1804
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001805class MemoryBIOTests(unittest.TestCase):
1806
1807 def test_read_write(self):
1808 bio = ssl.MemoryBIO()
1809 bio.write(b'foo')
1810 self.assertEqual(bio.read(), b'foo')
1811 self.assertEqual(bio.read(), b'')
1812 bio.write(b'foo')
1813 bio.write(b'bar')
1814 self.assertEqual(bio.read(), b'foobar')
1815 self.assertEqual(bio.read(), b'')
1816 bio.write(b'baz')
1817 self.assertEqual(bio.read(2), b'ba')
1818 self.assertEqual(bio.read(1), b'z')
1819 self.assertEqual(bio.read(1), b'')
1820
1821 def test_eof(self):
1822 bio = ssl.MemoryBIO()
1823 self.assertFalse(bio.eof)
1824 self.assertEqual(bio.read(), b'')
1825 self.assertFalse(bio.eof)
1826 bio.write(b'foo')
1827 self.assertFalse(bio.eof)
1828 bio.write_eof()
1829 self.assertFalse(bio.eof)
1830 self.assertEqual(bio.read(2), b'fo')
1831 self.assertFalse(bio.eof)
1832 self.assertEqual(bio.read(1), b'o')
1833 self.assertTrue(bio.eof)
1834 self.assertEqual(bio.read(), b'')
1835 self.assertTrue(bio.eof)
1836
1837 def test_pending(self):
1838 bio = ssl.MemoryBIO()
1839 self.assertEqual(bio.pending, 0)
1840 bio.write(b'foo')
1841 self.assertEqual(bio.pending, 3)
1842 for i in range(3):
1843 bio.read(1)
1844 self.assertEqual(bio.pending, 3-i-1)
1845 for i in range(3):
1846 bio.write(b'x')
1847 self.assertEqual(bio.pending, i+1)
1848 bio.read()
1849 self.assertEqual(bio.pending, 0)
1850
1851 def test_buffer_types(self):
1852 bio = ssl.MemoryBIO()
1853 bio.write(b'foo')
1854 self.assertEqual(bio.read(), b'foo')
1855 bio.write(bytearray(b'bar'))
1856 self.assertEqual(bio.read(), b'bar')
1857 bio.write(memoryview(b'baz'))
1858 self.assertEqual(bio.read(), b'baz')
1859
1860 def test_error_types(self):
1861 bio = ssl.MemoryBIO()
1862 self.assertRaises(TypeError, bio.write, 'foo')
1863 self.assertRaises(TypeError, bio.write, None)
1864 self.assertRaises(TypeError, bio.write, True)
1865 self.assertRaises(TypeError, bio.write, 1)
1866
1867
Christian Heimes9d50ab52018-02-27 10:17:30 +01001868class SSLObjectTests(unittest.TestCase):
1869 def test_private_init(self):
1870 bio = ssl.MemoryBIO()
1871 with self.assertRaisesRegex(TypeError, "public constructor"):
1872 ssl.SSLObject(bio, bio)
1873
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001874 def test_unwrap(self):
1875 client_ctx, server_ctx, hostname = testing_context()
1876 c_in = ssl.MemoryBIO()
1877 c_out = ssl.MemoryBIO()
1878 s_in = ssl.MemoryBIO()
1879 s_out = ssl.MemoryBIO()
1880 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1881 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1882
1883 # Loop on the handshake for a bit to get it settled
1884 for _ in range(5):
1885 try:
1886 client.do_handshake()
1887 except ssl.SSLWantReadError:
1888 pass
1889 if c_out.pending:
1890 s_in.write(c_out.read())
1891 try:
1892 server.do_handshake()
1893 except ssl.SSLWantReadError:
1894 pass
1895 if s_out.pending:
1896 c_in.write(s_out.read())
1897 # Now the handshakes should be complete (don't raise WantReadError)
1898 client.do_handshake()
1899 server.do_handshake()
1900
1901 # Now if we unwrap one side unilaterally, it should send close-notify
1902 # and raise WantReadError:
1903 with self.assertRaises(ssl.SSLWantReadError):
1904 client.unwrap()
1905
1906 # But server.unwrap() does not raise, because it reads the client's
1907 # close-notify:
1908 s_in.write(c_out.read())
1909 server.unwrap()
1910
1911 # And now that the client gets the server's close-notify, it doesn't
1912 # raise either.
1913 c_in.write(s_out.read())
1914 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001915
Martin Panter3840b2a2016-03-27 01:53:46 +00001916class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001917 """Tests that connect to a simple server running in the background"""
1918
1919 def setUp(self):
1920 server = ThreadedEchoServer(SIGNED_CERTFILE)
1921 self.server_addr = (HOST, server.port)
1922 server.__enter__()
1923 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001924
Antoine Pitrou480a1242010-04-28 21:37:09 +00001925 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001926 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001927 cert_reqs=ssl.CERT_NONE) as s:
1928 s.connect(self.server_addr)
1929 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001930 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001931
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001933 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001934 cert_reqs=ssl.CERT_REQUIRED,
1935 ca_certs=SIGNING_CA) as s:
1936 s.connect(self.server_addr)
1937 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001938 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001939
Martin Panter3840b2a2016-03-27 01:53:46 +00001940 def test_connect_fail(self):
1941 # This should fail because we have no verification certs. Connection
1942 # failure crashes ThreadedEchoServer, so run this in an independent
1943 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001944 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001945 cert_reqs=ssl.CERT_REQUIRED)
1946 self.addCleanup(s.close)
1947 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1948 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001949
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001950 def test_connect_ex(self):
1951 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001952 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001953 cert_reqs=ssl.CERT_REQUIRED,
1954 ca_certs=SIGNING_CA)
1955 self.addCleanup(s.close)
1956 self.assertEqual(0, s.connect_ex(self.server_addr))
1957 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001958
1959 def test_non_blocking_connect_ex(self):
1960 # Issue #11326: non-blocking connect_ex() should allow handshake
1961 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001962 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001963 cert_reqs=ssl.CERT_REQUIRED,
1964 ca_certs=SIGNING_CA,
1965 do_handshake_on_connect=False)
1966 self.addCleanup(s.close)
1967 s.setblocking(False)
1968 rc = s.connect_ex(self.server_addr)
1969 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1970 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1971 # Wait for connect to finish
1972 select.select([], [s], [], 5.0)
1973 # Non-blocking handshake
1974 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001975 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001976 s.do_handshake()
1977 break
1978 except ssl.SSLWantReadError:
1979 select.select([s], [], [], 5.0)
1980 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001981 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001982 # SSL established
1983 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001984
Antoine Pitrou152efa22010-05-16 18:19:27 +00001985 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001986 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001987 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001988 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1989 s.connect(self.server_addr)
1990 self.assertEqual({}, s.getpeercert())
1991 # Same with a server hostname
1992 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1993 server_hostname="dummy") as s:
1994 s.connect(self.server_addr)
1995 ctx.verify_mode = ssl.CERT_REQUIRED
1996 # This should succeed because we specify the root cert
1997 ctx.load_verify_locations(SIGNING_CA)
1998 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1999 s.connect(self.server_addr)
2000 cert = s.getpeercert()
2001 self.assertTrue(cert)
2002
2003 def test_connect_with_context_fail(self):
2004 # This should fail because we have no verification certs. Connection
2005 # failure crashes ThreadedEchoServer, so run this in an independent
2006 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02002007 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002008 ctx.verify_mode = ssl.CERT_REQUIRED
2009 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
2010 self.addCleanup(s.close)
2011 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
2012 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00002013
2014 def test_connect_capath(self):
2015 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00002016 # NOTE: the subject hashing algorithm has been changed between
2017 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
2018 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00002019 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02002020 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002021 ctx.verify_mode = ssl.CERT_REQUIRED
2022 ctx.load_verify_locations(capath=CAPATH)
2023 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2024 s.connect(self.server_addr)
2025 cert = s.getpeercert()
2026 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02002027
Martin Panter3840b2a2016-03-27 01:53:46 +00002028 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02002029 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002030 ctx.verify_mode = ssl.CERT_REQUIRED
2031 ctx.load_verify_locations(capath=BYTES_CAPATH)
2032 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2033 s.connect(self.server_addr)
2034 cert = s.getpeercert()
2035 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002036
Christian Heimesefff7062013-11-21 03:35:02 +01002037 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002038 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01002039 pem = f.read()
2040 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02002041 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002042 ctx.verify_mode = ssl.CERT_REQUIRED
2043 ctx.load_verify_locations(cadata=pem)
2044 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2045 s.connect(self.server_addr)
2046 cert = s.getpeercert()
2047 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002048
Martin Panter3840b2a2016-03-27 01:53:46 +00002049 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02002050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002051 ctx.verify_mode = ssl.CERT_REQUIRED
2052 ctx.load_verify_locations(cadata=der)
2053 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
2054 s.connect(self.server_addr)
2055 cert = s.getpeercert()
2056 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01002057
Antoine Pitroue3220242010-04-24 11:13:53 +00002058 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
2059 def test_makefile_close(self):
2060 # Issue #5238: creating a file-like object with makefile() shouldn't
2061 # delay closing the underlying "real socket" (here tested with its
2062 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02002063 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00002064 ss.connect(self.server_addr)
2065 fd = ss.fileno()
2066 f = ss.makefile()
2067 f.close()
2068 # The fd is still open
2069 os.read(fd, 0)
2070 # Closing the SSL socket should close the fd too
2071 ss.close()
2072 gc.collect()
2073 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00002074 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00002075 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00002076
Antoine Pitrou480a1242010-04-28 21:37:09 +00002077 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002078 s = socket.socket(socket.AF_INET)
2079 s.connect(self.server_addr)
2080 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02002081 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00002082 cert_reqs=ssl.CERT_NONE,
2083 do_handshake_on_connect=False)
2084 self.addCleanup(s.close)
2085 count = 0
2086 while True:
2087 try:
2088 count += 1
2089 s.do_handshake()
2090 break
2091 except ssl.SSLWantReadError:
2092 select.select([s], [], [])
2093 except ssl.SSLWantWriteError:
2094 select.select([], [s], [])
2095 if support.verbose:
2096 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002097
Antoine Pitrou480a1242010-04-28 21:37:09 +00002098 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002099 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002100
Martin Panter3840b2a2016-03-27 01:53:46 +00002101 def test_get_server_certificate_fail(self):
2102 # Connection failure crashes ThreadedEchoServer, so run this in an
2103 # independent test method
2104 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002105
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002106 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002107 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002108 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2109 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002110 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002111 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2112 s.connect(self.server_addr)
2113 # Error checking can happen at instantiation or when connecting
2114 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2115 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002116 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002117 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2118 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002119
Christian Heimes9a5395a2013-06-17 15:44:12 +02002120 def test_get_ca_certs_capath(self):
2121 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002122 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002123 ctx.load_verify_locations(capath=CAPATH)
2124 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002125 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2126 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002127 s.connect(self.server_addr)
2128 cert = s.getpeercert()
2129 self.assertTrue(cert)
2130 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002131
Christian Heimes575596e2013-12-15 21:49:17 +01002132 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002133 def test_context_setget(self):
2134 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002135 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2136 ctx1.load_verify_locations(capath=CAPATH)
2137 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2138 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002139 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002140 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002141 ss.connect(self.server_addr)
2142 self.assertIs(ss.context, ctx1)
2143 self.assertIs(ss._sslobj.context, ctx1)
2144 ss.context = ctx2
2145 self.assertIs(ss.context, ctx2)
2146 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002147
2148 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2149 # A simple IO loop. Call func(*args) depending on the error we get
2150 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2151 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002152 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002153 count = 0
2154 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002155 if time.monotonic() > deadline:
2156 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002157 errno = None
2158 count += 1
2159 try:
2160 ret = func(*args)
2161 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002162 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002163 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002164 raise
2165 errno = e.errno
2166 # Get any data from the outgoing BIO irrespective of any error, and
2167 # send it to the socket.
2168 buf = outgoing.read()
2169 sock.sendall(buf)
2170 # If there's no error, we're done. For WANT_READ, we need to get
2171 # data from the socket and put it in the incoming BIO.
2172 if errno is None:
2173 break
2174 elif errno == ssl.SSL_ERROR_WANT_READ:
2175 buf = sock.recv(32768)
2176 if buf:
2177 incoming.write(buf)
2178 else:
2179 incoming.write_eof()
2180 if support.verbose:
2181 sys.stdout.write("Needed %d calls to complete %s().\n"
2182 % (count, func.__name__))
2183 return ret
2184
Martin Panter3840b2a2016-03-27 01:53:46 +00002185 def test_bio_handshake(self):
2186 sock = socket.socket(socket.AF_INET)
2187 self.addCleanup(sock.close)
2188 sock.connect(self.server_addr)
2189 incoming = ssl.MemoryBIO()
2190 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2192 self.assertTrue(ctx.check_hostname)
2193 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002194 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002195 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2196 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002197 self.assertIs(sslobj._sslobj.owner, sslobj)
2198 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002199 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002200 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002201 self.assertRaises(ValueError, sslobj.getpeercert)
2202 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2203 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2204 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2205 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002206 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002207 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002208 self.assertTrue(sslobj.getpeercert())
2209 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2210 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2211 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002212 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002213 except ssl.SSLSyscallError:
2214 # If the server shuts down the TCP connection without sending a
2215 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2216 pass
2217 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2218
2219 def test_bio_read_write_data(self):
2220 sock = socket.socket(socket.AF_INET)
2221 self.addCleanup(sock.close)
2222 sock.connect(self.server_addr)
2223 incoming = ssl.MemoryBIO()
2224 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002225 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002226 ctx.verify_mode = ssl.CERT_NONE
2227 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2228 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2229 req = b'FOO\n'
2230 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2231 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2232 self.assertEqual(buf, b'foo\n')
2233 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002234
2235
Martin Panter3840b2a2016-03-27 01:53:46 +00002236class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002237
Martin Panter3840b2a2016-03-27 01:53:46 +00002238 def test_timeout_connect_ex(self):
2239 # Issue #12065: on a timeout, connect_ex() should return the original
2240 # errno (mimicking the behaviour of non-SSL sockets).
2241 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002242 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002243 cert_reqs=ssl.CERT_REQUIRED,
2244 do_handshake_on_connect=False)
2245 self.addCleanup(s.close)
2246 s.settimeout(0.0000001)
2247 rc = s.connect_ex((REMOTE_HOST, 443))
2248 if rc == 0:
2249 self.skipTest("REMOTE_HOST responded too quickly")
2250 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2251
2252 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2253 def test_get_server_certificate_ipv6(self):
2254 with support.transient_internet('ipv6.google.com'):
2255 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2256 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2257
Martin Panter3840b2a2016-03-27 01:53:46 +00002258
2259def _test_get_server_certificate(test, host, port, cert=None):
2260 pem = ssl.get_server_certificate((host, port))
2261 if not pem:
2262 test.fail("No server certificate on %s:%s!" % (host, port))
2263
2264 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2265 if not pem:
2266 test.fail("No server certificate on %s:%s!" % (host, port))
2267 if support.verbose:
2268 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2269
2270def _test_get_server_certificate_fail(test, host, port):
2271 try:
2272 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2273 except ssl.SSLError as x:
2274 #should fail
2275 if support.verbose:
2276 sys.stdout.write("%s\n" % x)
2277 else:
2278 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2279
2280
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002281from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002282
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002283class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002284
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002285 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002286
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002287 """A mildly complicated class, because we want it to work both
2288 with and without the SSL wrapper around the socket connection, so
2289 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002290
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002291 def __init__(self, server, connsock, addr):
2292 self.server = server
2293 self.running = False
2294 self.sock = connsock
2295 self.addr = addr
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03002296 self.sock.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002297 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002298 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002299 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002300
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002301 def wrap_conn(self):
2302 try:
2303 self.sslconn = self.server.context.wrap_socket(
2304 self.sock, server_side=True)
2305 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2306 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002307 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002308 # We treat ConnectionResetError as though it were an
2309 # SSLError - OpenSSL on Ubuntu abruptly closes the
2310 # connection when asked to use an unsupported protocol.
2311 #
Christian Heimes529525f2018-05-23 22:24:45 +02002312 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2313 # tries to send session tickets after handshake.
2314 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002315 #
2316 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2317 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002318 self.server.conn_errors.append(str(e))
2319 if self.server.chatty:
2320 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2321 self.running = False
2322 self.close()
2323 return False
2324 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002325 # OSError may occur with wrong protocols, e.g. both
2326 # sides use PROTOCOL_TLS_SERVER.
2327 #
2328 # XXX Various errors can have happened here, for example
2329 # a mismatching protocol version, an invalid certificate,
2330 # or a low-level bug. This should be made more discriminating.
2331 #
2332 # bpo-31323: Store the exception as string to prevent
2333 # a reference leak: server -> conn_errors -> exception
2334 # -> traceback -> self (ConnectionHandler) -> server
2335 self.server.conn_errors.append(str(e))
2336 if self.server.chatty:
2337 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2338 self.running = False
2339 self.server.stop()
2340 self.close()
2341 return False
2342 else:
2343 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2344 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2345 cert = self.sslconn.getpeercert()
2346 if support.verbose and self.server.chatty:
2347 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2348 cert_binary = self.sslconn.getpeercert(True)
2349 if support.verbose and self.server.chatty:
2350 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2351 cipher = self.sslconn.cipher()
2352 if support.verbose and self.server.chatty:
2353 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2354 sys.stdout.write(" server: selected protocol is now "
2355 + str(self.sslconn.selected_npn_protocol()) + "\n")
2356 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002357
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002358 def read(self):
2359 if self.sslconn:
2360 return self.sslconn.read()
2361 else:
2362 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002363
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002364 def write(self, bytes):
2365 if self.sslconn:
2366 return self.sslconn.write(bytes)
2367 else:
2368 return self.sock.send(bytes)
2369
2370 def close(self):
2371 if self.sslconn:
2372 self.sslconn.close()
2373 else:
2374 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002375
Antoine Pitrou480a1242010-04-28 21:37:09 +00002376 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002377 self.running = True
2378 if not self.server.starttls_server:
2379 if not self.wrap_conn():
2380 return
2381 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002382 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002383 msg = self.read()
2384 stripped = msg.strip()
2385 if not stripped:
2386 # eof, so quit this handler
2387 self.running = False
2388 try:
2389 self.sock = self.sslconn.unwrap()
2390 except OSError:
2391 # Many tests shut the TCP connection down
2392 # without an SSL shutdown. This causes
2393 # unwrap() to raise OSError with errno=0!
2394 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002395 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002396 self.sslconn = None
2397 self.close()
2398 elif stripped == b'over':
2399 if support.verbose and self.server.connectionchatty:
2400 sys.stdout.write(" server: client closed connection\n")
2401 self.close()
2402 return
2403 elif (self.server.starttls_server and
2404 stripped == b'STARTTLS'):
2405 if support.verbose and self.server.connectionchatty:
2406 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2407 self.write(b"OK\n")
2408 if not self.wrap_conn():
2409 return
2410 elif (self.server.starttls_server and self.sslconn
2411 and stripped == b'ENDTLS'):
2412 if support.verbose and self.server.connectionchatty:
2413 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2414 self.write(b"OK\n")
2415 self.sock = self.sslconn.unwrap()
2416 self.sslconn = None
2417 if support.verbose and self.server.connectionchatty:
2418 sys.stdout.write(" server: connection is now unencrypted...\n")
2419 elif stripped == b'CB tls-unique':
2420 if support.verbose and self.server.connectionchatty:
2421 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2422 data = self.sslconn.get_channel_binding("tls-unique")
2423 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002424 elif stripped == b'PHA':
2425 if support.verbose and self.server.connectionchatty:
2426 sys.stdout.write(" server: initiating post handshake auth\n")
2427 try:
2428 self.sslconn.verify_client_post_handshake()
2429 except ssl.SSLError as e:
2430 self.write(repr(e).encode("us-ascii") + b"\n")
2431 else:
2432 self.write(b"OK\n")
2433 elif stripped == b'HASCERT':
2434 if self.sslconn.getpeercert() is not None:
2435 self.write(b'TRUE\n')
2436 else:
2437 self.write(b'FALSE\n')
2438 elif stripped == b'GETCERT':
2439 cert = self.sslconn.getpeercert()
2440 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 else:
2442 if (support.verbose and
2443 self.server.connectionchatty):
2444 ctype = (self.sslconn and "encrypted") or "unencrypted"
2445 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2446 % (msg, ctype, msg.lower(), ctype))
2447 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002448 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002449 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2450 # when connection is not shut down gracefully.
2451 if self.server.chatty and support.verbose:
2452 sys.stdout.write(
2453 " Connection reset by peer: {}\n".format(
2454 self.addr)
2455 )
2456 self.close()
2457 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002458 except ssl.SSLError as err:
2459 # On Windows sometimes test_pha_required_nocert receives the
2460 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2461 # before the 'tlsv13 alert certificate required' exception.
2462 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2463 # is received test_pha_required_nocert fails with ConnectionResetError
2464 # because the underlying socket is closed
2465 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2466 if self.server.chatty and support.verbose:
2467 sys.stdout.write(err.args[1])
2468 # test_pha_required_nocert is expecting this exception
2469 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002470 except OSError:
2471 if self.server.chatty:
2472 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002473 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002474 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002475
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002476 # normally, we'd just stop here, but for the test
2477 # harness, we want to stop the server
2478 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002479
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002480 def __init__(self, certificate=None, ssl_version=None,
2481 certreqs=None, cacerts=None,
2482 chatty=True, connectionchatty=False, starttls_server=False,
2483 npn_protocols=None, alpn_protocols=None,
2484 ciphers=None, context=None):
2485 if context:
2486 self.context = context
2487 else:
2488 self.context = ssl.SSLContext(ssl_version
2489 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002490 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002491 self.context.verify_mode = (certreqs if certreqs is not None
2492 else ssl.CERT_NONE)
2493 if cacerts:
2494 self.context.load_verify_locations(cacerts)
2495 if certificate:
2496 self.context.load_cert_chain(certificate)
2497 if npn_protocols:
2498 self.context.set_npn_protocols(npn_protocols)
2499 if alpn_protocols:
2500 self.context.set_alpn_protocols(alpn_protocols)
2501 if ciphers:
2502 self.context.set_ciphers(ciphers)
2503 self.chatty = chatty
2504 self.connectionchatty = connectionchatty
2505 self.starttls_server = starttls_server
2506 self.sock = socket.socket()
2507 self.port = support.bind_port(self.sock)
2508 self.flag = None
2509 self.active = False
2510 self.selected_npn_protocols = []
2511 self.selected_alpn_protocols = []
2512 self.shared_ciphers = []
2513 self.conn_errors = []
2514 threading.Thread.__init__(self)
2515 self.daemon = True
2516
2517 def __enter__(self):
2518 self.start(threading.Event())
2519 self.flag.wait()
2520 return self
2521
2522 def __exit__(self, *args):
2523 self.stop()
2524 self.join()
2525
2526 def start(self, flag=None):
2527 self.flag = flag
2528 threading.Thread.start(self)
2529
2530 def run(self):
2531 self.sock.settimeout(0.05)
2532 self.sock.listen()
2533 self.active = True
2534 if self.flag:
2535 # signal an event
2536 self.flag.set()
2537 while self.active:
2538 try:
2539 newconn, connaddr = self.sock.accept()
2540 if support.verbose and self.chatty:
2541 sys.stdout.write(' server: new connection from '
2542 + repr(connaddr) + '\n')
2543 handler = self.ConnectionHandler(self, newconn, connaddr)
2544 handler.start()
2545 handler.join()
2546 except socket.timeout:
2547 pass
2548 except KeyboardInterrupt:
2549 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002550 except BaseException as e:
2551 if support.verbose and self.chatty:
2552 sys.stdout.write(
2553 ' connection handling failed: ' + repr(e) + '\n')
2554
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002555 self.sock.close()
2556
2557 def stop(self):
2558 self.active = False
2559
2560class AsyncoreEchoServer(threading.Thread):
2561
2562 # this one's based on asyncore.dispatcher
2563
2564 class EchoServer (asyncore.dispatcher):
2565
2566 class ConnectionHandler(asyncore.dispatcher_with_send):
2567
2568 def __init__(self, conn, certfile):
2569 self.socket = test_wrap_socket(conn, server_side=True,
2570 certfile=certfile,
2571 do_handshake_on_connect=False)
2572 asyncore.dispatcher_with_send.__init__(self, self.socket)
2573 self._ssl_accepting = True
2574 self._do_ssl_handshake()
2575
2576 def readable(self):
2577 if isinstance(self.socket, ssl.SSLSocket):
2578 while self.socket.pending() > 0:
2579 self.handle_read_event()
2580 return True
2581
2582 def _do_ssl_handshake(self):
2583 try:
2584 self.socket.do_handshake()
2585 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2586 return
2587 except ssl.SSLEOFError:
2588 return self.handle_close()
2589 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002590 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002591 except OSError as err:
2592 if err.args[0] == errno.ECONNABORTED:
2593 return self.handle_close()
2594 else:
2595 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002596
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002597 def handle_read(self):
2598 if self._ssl_accepting:
2599 self._do_ssl_handshake()
2600 else:
2601 data = self.recv(1024)
2602 if support.verbose:
2603 sys.stdout.write(" server: read %s from client\n" % repr(data))
2604 if not data:
2605 self.close()
2606 else:
2607 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002608
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002609 def handle_close(self):
2610 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002611 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002612 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002613
2614 def handle_error(self):
2615 raise
2616
Trent Nelson78520002008-04-10 20:54:35 +00002617 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002618 self.certfile = certfile
2619 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2620 self.port = support.bind_port(sock, '')
2621 asyncore.dispatcher.__init__(self, sock)
2622 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002623
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002624 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002625 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002626 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2627 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002628
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002629 def handle_error(self):
2630 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002631
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002632 def __init__(self, certfile):
2633 self.flag = None
2634 self.active = False
2635 self.server = self.EchoServer(certfile)
2636 self.port = self.server.port
2637 threading.Thread.__init__(self)
2638 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002639
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640 def __str__(self):
2641 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002642
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002643 def __enter__(self):
2644 self.start(threading.Event())
2645 self.flag.wait()
2646 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002647
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002648 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002649 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002650 sys.stdout.write(" cleanup: stopping server.\n")
2651 self.stop()
2652 if support.verbose:
2653 sys.stdout.write(" cleanup: joining server thread.\n")
2654 self.join()
2655 if support.verbose:
2656 sys.stdout.write(" cleanup: successfully joined.\n")
2657 # make sure that ConnectionHandler is removed from socket_map
2658 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002659
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002660 def start (self, flag=None):
2661 self.flag = flag
2662 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002663
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002664 def run(self):
2665 self.active = True
2666 if self.flag:
2667 self.flag.set()
2668 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002669 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002670 asyncore.loop(1)
2671 except:
2672 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002673
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002674 def stop(self):
2675 self.active = False
2676 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002677
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002678def server_params_test(client_context, server_context, indata=b"FOO\n",
2679 chatty=True, connectionchatty=False, sni_name=None,
2680 session=None):
2681 """
2682 Launch a server, connect a client to it and try various reads
2683 and writes.
2684 """
2685 stats = {}
2686 server = ThreadedEchoServer(context=server_context,
2687 chatty=chatty,
2688 connectionchatty=False)
2689 with server:
2690 with client_context.wrap_socket(socket.socket(),
2691 server_hostname=sni_name, session=session) as s:
2692 s.connect((HOST, server.port))
2693 for arg in [indata, bytearray(indata), memoryview(indata)]:
2694 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002695 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002696 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002697 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002699 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002700 if connectionchatty:
2701 if support.verbose:
2702 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002703 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002704 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002705 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2706 % (outdata[:20], len(outdata),
2707 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002708 s.write(b"over\n")
2709 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002710 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002711 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002712 stats.update({
2713 'compression': s.compression(),
2714 'cipher': s.cipher(),
2715 'peercert': s.getpeercert(),
2716 'client_alpn_protocol': s.selected_alpn_protocol(),
2717 'client_npn_protocol': s.selected_npn_protocol(),
2718 'version': s.version(),
2719 'session_reused': s.session_reused,
2720 'session': s.session,
2721 })
2722 s.close()
2723 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2724 stats['server_npn_protocols'] = server.selected_npn_protocols
2725 stats['server_shared_ciphers'] = server.shared_ciphers
2726 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002727
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002728def try_protocol_combo(server_protocol, client_protocol, expect_success,
2729 certsreqs=None, server_options=0, client_options=0):
2730 """
2731 Try to SSL-connect using *client_protocol* to *server_protocol*.
2732 If *expect_success* is true, assert that the connection succeeds,
2733 if it's false, assert that the connection fails.
2734 Also, if *expect_success* is a string, assert that it is the protocol
2735 version actually used by the connection.
2736 """
2737 if certsreqs is None:
2738 certsreqs = ssl.CERT_NONE
2739 certtype = {
2740 ssl.CERT_NONE: "CERT_NONE",
2741 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2742 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2743 }[certsreqs]
2744 if support.verbose:
2745 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2746 sys.stdout.write(formatstr %
2747 (ssl.get_protocol_name(client_protocol),
2748 ssl.get_protocol_name(server_protocol),
2749 certtype))
2750 client_context = ssl.SSLContext(client_protocol)
2751 client_context.options |= client_options
2752 server_context = ssl.SSLContext(server_protocol)
2753 server_context.options |= server_options
2754
Victor Stinner3ef63442019-02-19 18:06:03 +01002755 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2756 if (min_version is not None
2757 # SSLContext.minimum_version is only available on recent OpenSSL
2758 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2759 and hasattr(server_context, 'minimum_version')
2760 and server_protocol == ssl.PROTOCOL_TLS
2761 and server_context.minimum_version > min_version):
2762 # If OpenSSL configuration is strict and requires more recent TLS
2763 # version, we have to change the minimum to test old TLS versions.
2764 server_context.minimum_version = min_version
2765
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002766 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2767 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2768 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002769 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002770 client_context.set_ciphers("ALL")
2771
2772 for ctx in (client_context, server_context):
2773 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002774 ctx.load_cert_chain(SIGNED_CERTFILE)
2775 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002776 try:
2777 stats = server_params_test(client_context, server_context,
2778 chatty=False, connectionchatty=False)
2779 # Protocol mismatch can result in either an SSLError, or a
2780 # "Connection reset by peer" error.
2781 except ssl.SSLError:
2782 if expect_success:
2783 raise
2784 except OSError as e:
2785 if expect_success or e.errno != errno.ECONNRESET:
2786 raise
2787 else:
2788 if not expect_success:
2789 raise AssertionError(
2790 "Client protocol %s succeeded with server protocol %s!"
2791 % (ssl.get_protocol_name(client_protocol),
2792 ssl.get_protocol_name(server_protocol)))
2793 elif (expect_success is not True
2794 and expect_success != stats['version']):
2795 raise AssertionError("version mismatch: expected %r, got %r"
2796 % (expect_success, stats['version']))
2797
2798
2799class ThreadedTests(unittest.TestCase):
2800
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002801 def test_echo(self):
2802 """Basic test of an SSL client connecting to a server"""
2803 if support.verbose:
2804 sys.stdout.write("\n")
2805 for protocol in PROTOCOLS:
2806 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2807 continue
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02002808 if not has_tls_protocol(protocol):
2809 continue
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002810 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2811 context = ssl.SSLContext(protocol)
2812 context.load_cert_chain(CERTFILE)
2813 server_params_test(context, context,
2814 chatty=True, connectionchatty=True)
2815
Christian Heimesa170fa12017-09-15 20:27:30 +02002816 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002817
2818 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2819 server_params_test(client_context=client_context,
2820 server_context=server_context,
2821 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002822 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002823
2824 client_context.check_hostname = False
2825 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2826 with self.assertRaises(ssl.SSLError) as e:
2827 server_params_test(client_context=server_context,
2828 server_context=client_context,
2829 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002830 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002831 self.assertIn('called a function you should not call',
2832 str(e.exception))
2833
2834 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2835 with self.assertRaises(ssl.SSLError) as e:
2836 server_params_test(client_context=server_context,
2837 server_context=server_context,
2838 chatty=True, connectionchatty=True)
2839 self.assertIn('called a function you should not call',
2840 str(e.exception))
2841
2842 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2843 with self.assertRaises(ssl.SSLError) as e:
2844 server_params_test(client_context=server_context,
2845 server_context=client_context,
2846 chatty=True, connectionchatty=True)
2847 self.assertIn('called a function you should not call',
2848 str(e.exception))
2849
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002850 def test_getpeercert(self):
2851 if support.verbose:
2852 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002853
2854 client_context, server_context, hostname = testing_context()
2855 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002856 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002857 with client_context.wrap_socket(socket.socket(),
2858 do_handshake_on_connect=False,
2859 server_hostname=hostname) as s:
2860 s.connect((HOST, server.port))
2861 # getpeercert() raise ValueError while the handshake isn't
2862 # done.
2863 with self.assertRaises(ValueError):
2864 s.getpeercert()
2865 s.do_handshake()
2866 cert = s.getpeercert()
2867 self.assertTrue(cert, "Can't get peer certificate.")
2868 cipher = s.cipher()
2869 if support.verbose:
2870 sys.stdout.write(pprint.pformat(cert) + '\n')
2871 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2872 if 'subject' not in cert:
2873 self.fail("No subject field in certificate: %s." %
2874 pprint.pformat(cert))
2875 if ((('organizationName', 'Python Software Foundation'),)
2876 not in cert['subject']):
2877 self.fail(
2878 "Missing or invalid 'organizationName' field in certificate subject; "
2879 "should be 'Python Software Foundation'.")
2880 self.assertIn('notBefore', cert)
2881 self.assertIn('notAfter', cert)
2882 before = ssl.cert_time_to_seconds(cert['notBefore'])
2883 after = ssl.cert_time_to_seconds(cert['notAfter'])
2884 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002885
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002886 @unittest.skipUnless(have_verify_flags(),
2887 "verify_flags need OpenSSL > 0.9.8")
2888 def test_crl_check(self):
2889 if support.verbose:
2890 sys.stdout.write("\n")
2891
Christian Heimesa170fa12017-09-15 20:27:30 +02002892 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002893
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002894 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002895 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002896
2897 # VERIFY_DEFAULT should pass
2898 server = ThreadedEchoServer(context=server_context, chatty=True)
2899 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002900 with client_context.wrap_socket(socket.socket(),
2901 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002902 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002903 cert = s.getpeercert()
2904 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002905
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002906 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002907 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002908
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002909 server = ThreadedEchoServer(context=server_context, chatty=True)
2910 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002911 with client_context.wrap_socket(socket.socket(),
2912 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002913 with self.assertRaisesRegex(ssl.SSLError,
2914 "certificate verify failed"):
2915 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002916
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002917 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002918 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002919
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002920 server = ThreadedEchoServer(context=server_context, chatty=True)
2921 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002922 with client_context.wrap_socket(socket.socket(),
2923 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002924 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002925 cert = s.getpeercert()
2926 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002927
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002928 def test_check_hostname(self):
2929 if support.verbose:
2930 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002931
Christian Heimesa170fa12017-09-15 20:27:30 +02002932 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002933
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002934 # correct hostname should verify
2935 server = ThreadedEchoServer(context=server_context, chatty=True)
2936 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002937 with client_context.wrap_socket(socket.socket(),
2938 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002939 s.connect((HOST, server.port))
2940 cert = s.getpeercert()
2941 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002942
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002943 # incorrect hostname should raise an exception
2944 server = ThreadedEchoServer(context=server_context, chatty=True)
2945 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002946 with client_context.wrap_socket(socket.socket(),
2947 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002948 with self.assertRaisesRegex(
2949 ssl.CertificateError,
2950 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002951 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002952
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002953 # missing server_hostname arg should cause an exception, too
2954 server = ThreadedEchoServer(context=server_context, chatty=True)
2955 with server:
2956 with socket.socket() as s:
2957 with self.assertRaisesRegex(ValueError,
2958 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002959 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002960
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002961 def test_ecc_cert(self):
2962 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2963 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002964 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002965 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2966
2967 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2968 # load ECC cert
2969 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2970
2971 # correct hostname should verify
2972 server = ThreadedEchoServer(context=server_context, chatty=True)
2973 with server:
2974 with client_context.wrap_socket(socket.socket(),
2975 server_hostname=hostname) as s:
2976 s.connect((HOST, server.port))
2977 cert = s.getpeercert()
2978 self.assertTrue(cert, "Can't get peer certificate.")
2979 cipher = s.cipher()[0].split('-')
2980 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2981
2982 def test_dual_rsa_ecc(self):
2983 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2984 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002985 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2986 # algorithms.
2987 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002988 # only ECDSA certs
2989 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2990 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2991
2992 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2993 # load ECC and RSA key/cert pairs
2994 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2995 server_context.load_cert_chain(SIGNED_CERTFILE)
2996
2997 # correct hostname should verify
2998 server = ThreadedEchoServer(context=server_context, chatty=True)
2999 with server:
3000 with client_context.wrap_socket(socket.socket(),
3001 server_hostname=hostname) as s:
3002 s.connect((HOST, server.port))
3003 cert = s.getpeercert()
3004 self.assertTrue(cert, "Can't get peer certificate.")
3005 cipher = s.cipher()[0].split('-')
3006 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
3007
Christian Heimes66e57422018-01-29 14:25:13 +01003008 def test_check_hostname_idn(self):
3009 if support.verbose:
3010 sys.stdout.write("\n")
3011
Christian Heimes11a14932018-02-24 02:35:08 +01003012 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01003013 server_context.load_cert_chain(IDNSANSFILE)
3014
Christian Heimes11a14932018-02-24 02:35:08 +01003015 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01003016 context.verify_mode = ssl.CERT_REQUIRED
3017 context.check_hostname = True
3018 context.load_verify_locations(SIGNING_CA)
3019
3020 # correct hostname should verify, when specified in several
3021 # different ways
3022 idn_hostnames = [
3023 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003024 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003025 ('xn--knig-5qa.idn.pythontest.net',
3026 'xn--knig-5qa.idn.pythontest.net'),
3027 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003028 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003029
3030 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003031 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01003032 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
3033 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3034 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01003035 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
3036
3037 # ('königsgäßchen.idna2008.pythontest.net',
3038 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3039 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3040 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3041 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
3042 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
3043
Christian Heimes66e57422018-01-29 14:25:13 +01003044 ]
3045 for server_hostname, expected_hostname in idn_hostnames:
3046 server = ThreadedEchoServer(context=server_context, chatty=True)
3047 with server:
3048 with context.wrap_socket(socket.socket(),
3049 server_hostname=server_hostname) as s:
3050 self.assertEqual(s.server_hostname, expected_hostname)
3051 s.connect((HOST, server.port))
3052 cert = s.getpeercert()
3053 self.assertEqual(s.server_hostname, expected_hostname)
3054 self.assertTrue(cert, "Can't get peer certificate.")
3055
Christian Heimes66e57422018-01-29 14:25:13 +01003056 # incorrect hostname should raise an exception
3057 server = ThreadedEchoServer(context=server_context, chatty=True)
3058 with server:
3059 with context.wrap_socket(socket.socket(),
3060 server_hostname="python.example.org") as s:
3061 with self.assertRaises(ssl.CertificateError):
3062 s.connect((HOST, server.port))
3063
Christian Heimes529525f2018-05-23 22:24:45 +02003064 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003065 """Connecting when the server rejects the client's certificate
3066
3067 Launch a server with CERT_REQUIRED, and check that trying to
3068 connect to it with a wrong client certificate fails.
3069 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01003070 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003071 # load client cert that is not signed by trusted CA
3072 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003073 # require TLS client authentication
3074 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02003075 # TLS 1.3 has different handshake
3076 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01003077
3078 server = ThreadedEchoServer(
3079 context=server_context, chatty=True, connectionchatty=True,
3080 )
3081
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003082 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01003083 client_context.wrap_socket(socket.socket(),
3084 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003085 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 # Expect either an SSL error about the server rejecting
3087 # the connection, or a low-level connection reset (which
3088 # sometimes happens on Windows)
3089 s.connect((HOST, server.port))
3090 except ssl.SSLError as e:
3091 if support.verbose:
3092 sys.stdout.write("\nSSLError is %r\n" % e)
3093 except OSError as e:
3094 if e.errno != errno.ECONNRESET:
3095 raise
3096 if support.verbose:
3097 sys.stdout.write("\nsocket.error is %r\n" % e)
3098 else:
3099 self.fail("Use of invalid cert should have failed!")
3100
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003101 @requires_tls_version('TLSv1_3')
Christian Heimes529525f2018-05-23 22:24:45 +02003102 def test_wrong_cert_tls13(self):
3103 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003104 # load client cert that is not signed by trusted CA
3105 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003106 server_context.verify_mode = ssl.CERT_REQUIRED
3107 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3108 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3109
3110 server = ThreadedEchoServer(
3111 context=server_context, chatty=True, connectionchatty=True,
3112 )
3113 with server, \
3114 client_context.wrap_socket(socket.socket(),
3115 server_hostname=hostname) as s:
3116 # TLS 1.3 perform client cert exchange after handshake
3117 s.connect((HOST, server.port))
3118 try:
3119 s.write(b'data')
3120 s.read(4)
3121 except ssl.SSLError as e:
3122 if support.verbose:
3123 sys.stdout.write("\nSSLError is %r\n" % e)
3124 except OSError as e:
3125 if e.errno != errno.ECONNRESET:
3126 raise
3127 if support.verbose:
3128 sys.stdout.write("\nsocket.error is %r\n" % e)
3129 else:
3130 self.fail("Use of invalid cert should have failed!")
3131
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003132 def test_rude_shutdown(self):
3133 """A brutal shutdown of an SSL server should raise an OSError
3134 in the client when attempting handshake.
3135 """
3136 listener_ready = threading.Event()
3137 listener_gone = threading.Event()
3138
3139 s = socket.socket()
3140 port = support.bind_port(s, HOST)
3141
3142 # `listener` runs in a thread. It sits in an accept() until
3143 # the main thread connects. Then it rudely closes the socket,
3144 # and sets Event `listener_gone` to let the main thread know
3145 # the socket is gone.
3146 def listener():
3147 s.listen()
3148 listener_ready.set()
3149 newsock, addr = s.accept()
3150 newsock.close()
3151 s.close()
3152 listener_gone.set()
3153
3154 def connector():
3155 listener_ready.wait()
3156 with socket.socket() as c:
3157 c.connect((HOST, port))
3158 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003159 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003160 ssl_sock = test_wrap_socket(c)
3161 except OSError:
3162 pass
3163 else:
3164 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003165
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003166 t = threading.Thread(target=listener)
3167 t.start()
3168 try:
3169 connector()
3170 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003171 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003172
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003173 def test_ssl_cert_verify_error(self):
3174 if support.verbose:
3175 sys.stdout.write("\n")
3176
3177 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3178 server_context.load_cert_chain(SIGNED_CERTFILE)
3179
3180 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3181
3182 server = ThreadedEchoServer(context=server_context, chatty=True)
3183 with server:
3184 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003185 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003186 try:
3187 s.connect((HOST, server.port))
3188 except ssl.SSLError as e:
3189 msg = 'unable to get local issuer certificate'
3190 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3191 self.assertEqual(e.verify_code, 20)
3192 self.assertEqual(e.verify_message, msg)
3193 self.assertIn(msg, repr(e))
3194 self.assertIn('certificate verify failed', repr(e))
3195
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003196 @requires_tls_version('SSLv2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003197 def test_protocol_sslv2(self):
3198 """Connecting to an SSLv2 server with various client options"""
3199 if support.verbose:
3200 sys.stdout.write("\n")
3201 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3202 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3203 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003204 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003205 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003206 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3207 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3208 # SSLv23 client with specific SSL options
3209 if no_sslv2_implies_sslv3_hello():
3210 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003211 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003212 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003213 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003214 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003215 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003216 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003217
Christian Heimesa170fa12017-09-15 20:27:30 +02003218 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 """Connecting to an SSLv23 server with various client options"""
3220 if support.verbose:
3221 sys.stdout.write("\n")
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003222 if has_tls_version('SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003223 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003224 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003225 except OSError as x:
3226 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3227 if support.verbose:
3228 sys.stdout.write(
3229 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3230 % str(x))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003231 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003232 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3233 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003234 if has_tls_version('TLSv1'):
3235 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003236
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003237 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003238 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3239 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003240 if has_tls_version('TLSv1'):
3241 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003242
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003243 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003244 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3245 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003246 if has_tls_version('TLSv1'):
3247 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003248
3249 # Server with specific SSL options
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003250 if has_tls_version('SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003251 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003252 server_options=ssl.OP_NO_SSLv3)
3253 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003254 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003255 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003256 if has_tls_version('TLSv1'):
3257 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
3258 server_options=ssl.OP_NO_TLSv1)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003259
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003260 @requires_tls_version('SSLv3')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003261 def test_protocol_sslv3(self):
3262 """Connecting to an SSLv3 server with various client options"""
3263 if support.verbose:
3264 sys.stdout.write("\n")
3265 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3266 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3267 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003268 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003269 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003270 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003271 client_options=ssl.OP_NO_SSLv3)
3272 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3273 if no_sslv2_implies_sslv3_hello():
3274 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003275 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003276 False, client_options=ssl.OP_NO_SSLv2)
3277
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003278 @requires_tls_version('TLSv1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003279 def test_protocol_tlsv1(self):
3280 """Connecting to a TLSv1 server with various client options"""
3281 if support.verbose:
3282 sys.stdout.write("\n")
3283 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3284 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3285 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003286 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003287 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003288 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003289 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003290 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003291 client_options=ssl.OP_NO_TLSv1)
3292
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003293 @requires_tls_version('TLSv1_1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003294 def test_protocol_tlsv1_1(self):
3295 """Connecting to a TLSv1.1 server with various client options.
3296 Testing against older TLS versions."""
3297 if support.verbose:
3298 sys.stdout.write("\n")
3299 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003300 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003301 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003302 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003303 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003304 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003305 client_options=ssl.OP_NO_TLSv1_1)
3306
Christian Heimesa170fa12017-09-15 20:27:30 +02003307 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003308 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3309 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003310
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003311 @requires_tls_version('TLSv1_2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003312 def test_protocol_tlsv1_2(self):
3313 """Connecting to a TLSv1.2 server with various client options.
3314 Testing against older TLS versions."""
3315 if support.verbose:
3316 sys.stdout.write("\n")
3317 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3318 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3319 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003320 if has_tls_version('SSLv2'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003321 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003322 if has_tls_version('SSLv3'):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003323 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003324 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003325 client_options=ssl.OP_NO_TLSv1_2)
3326
Christian Heimesa170fa12017-09-15 20:27:30 +02003327 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003328 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3329 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3330 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3331 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3332
3333 def test_starttls(self):
3334 """Switching from clear text to encrypted and back again."""
3335 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3336
3337 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003338 starttls_server=True,
3339 chatty=True,
3340 connectionchatty=True)
3341 wrapped = False
3342 with server:
3343 s = socket.socket()
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03003344 s.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003345 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003346 if support.verbose:
3347 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003348 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003349 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003350 sys.stdout.write(
3351 " client: sending %r...\n" % indata)
3352 if wrapped:
3353 conn.write(indata)
3354 outdata = conn.read()
3355 else:
3356 s.send(indata)
3357 outdata = s.recv(1024)
3358 msg = outdata.strip().lower()
3359 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3360 # STARTTLS ok, switch to secure mode
3361 if support.verbose:
3362 sys.stdout.write(
3363 " client: read %r from server, starting TLS...\n"
3364 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003365 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003366 wrapped = True
3367 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3368 # ENDTLS ok, switch back to clear text
3369 if support.verbose:
3370 sys.stdout.write(
3371 " client: read %r from server, ending TLS...\n"
3372 % msg)
3373 s = conn.unwrap()
3374 wrapped = False
3375 else:
3376 if support.verbose:
3377 sys.stdout.write(
3378 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003379 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003380 sys.stdout.write(" client: closing connection.\n")
3381 if wrapped:
3382 conn.write(b"over\n")
3383 else:
3384 s.send(b"over\n")
3385 if wrapped:
3386 conn.close()
3387 else:
3388 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003389
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003390 def test_socketserver(self):
3391 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003392 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003393 # try to connect
3394 if support.verbose:
3395 sys.stdout.write('\n')
3396 with open(CERTFILE, 'rb') as f:
3397 d1 = f.read()
3398 d2 = ''
3399 # now fetch the same data from the HTTPS server
3400 url = 'https://localhost:%d/%s' % (
3401 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003402 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003403 f = urllib.request.urlopen(url, context=context)
3404 try:
3405 dlen = f.info().get("content-length")
3406 if dlen and (int(dlen) > 0):
3407 d2 = f.read(int(dlen))
3408 if support.verbose:
3409 sys.stdout.write(
3410 " client: read %d bytes from remote server '%s'\n"
3411 % (len(d2), server))
3412 finally:
3413 f.close()
3414 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003415
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003416 def test_asyncore_server(self):
3417 """Check the example asyncore integration."""
3418 if support.verbose:
3419 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003420
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003421 indata = b"FOO\n"
3422 server = AsyncoreEchoServer(CERTFILE)
3423 with server:
3424 s = test_wrap_socket(socket.socket())
3425 s.connect(('127.0.0.1', server.port))
3426 if support.verbose:
3427 sys.stdout.write(
3428 " client: sending %r...\n" % indata)
3429 s.write(indata)
3430 outdata = s.read()
3431 if support.verbose:
3432 sys.stdout.write(" client: read %r\n" % outdata)
3433 if outdata != indata.lower():
3434 self.fail(
3435 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3436 % (outdata[:20], len(outdata),
3437 indata[:20].lower(), len(indata)))
3438 s.write(b"over\n")
3439 if support.verbose:
3440 sys.stdout.write(" client: closing connection.\n")
3441 s.close()
3442 if support.verbose:
3443 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003444
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003445 def test_recv_send(self):
3446 """Test recv(), send() and friends."""
3447 if support.verbose:
3448 sys.stdout.write("\n")
3449
3450 server = ThreadedEchoServer(CERTFILE,
3451 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003452 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003453 cacerts=CERTFILE,
3454 chatty=True,
3455 connectionchatty=False)
3456 with server:
3457 s = test_wrap_socket(socket.socket(),
3458 server_side=False,
3459 certfile=CERTFILE,
3460 ca_certs=CERTFILE,
3461 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003462 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003463 s.connect((HOST, server.port))
3464 # helper methods for standardising recv* method signatures
3465 def _recv_into():
3466 b = bytearray(b"\0"*100)
3467 count = s.recv_into(b)
3468 return b[:count]
3469
3470 def _recvfrom_into():
3471 b = bytearray(b"\0"*100)
3472 count, addr = s.recvfrom_into(b)
3473 return b[:count]
3474
3475 # (name, method, expect success?, *args, return value func)
3476 send_methods = [
3477 ('send', s.send, True, [], len),
3478 ('sendto', s.sendto, False, ["some.address"], len),
3479 ('sendall', s.sendall, True, [], lambda x: None),
3480 ]
3481 # (name, method, whether to expect success, *args)
3482 recv_methods = [
3483 ('recv', s.recv, True, []),
3484 ('recvfrom', s.recvfrom, False, ["some.address"]),
3485 ('recv_into', _recv_into, True, []),
3486 ('recvfrom_into', _recvfrom_into, False, []),
3487 ]
3488 data_prefix = "PREFIX_"
3489
3490 for (meth_name, send_meth, expect_success, args,
3491 ret_val_meth) in send_methods:
3492 indata = (data_prefix + meth_name).encode('ascii')
3493 try:
3494 ret = send_meth(indata, *args)
3495 msg = "sending with {}".format(meth_name)
3496 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3497 outdata = s.read()
3498 if outdata != indata.lower():
3499 self.fail(
3500 "While sending with <<{name:s}>> bad data "
3501 "<<{outdata:r}>> ({nout:d}) received; "
3502 "expected <<{indata:r}>> ({nin:d})\n".format(
3503 name=meth_name, outdata=outdata[:20],
3504 nout=len(outdata),
3505 indata=indata[:20], nin=len(indata)
3506 )
3507 )
3508 except ValueError as e:
3509 if expect_success:
3510 self.fail(
3511 "Failed to send with method <<{name:s}>>; "
3512 "expected to succeed.\n".format(name=meth_name)
3513 )
3514 if not str(e).startswith(meth_name):
3515 self.fail(
3516 "Method <<{name:s}>> failed with unexpected "
3517 "exception message: {exp:s}\n".format(
3518 name=meth_name, exp=e
3519 )
3520 )
3521
3522 for meth_name, recv_meth, expect_success, args in recv_methods:
3523 indata = (data_prefix + meth_name).encode('ascii')
3524 try:
3525 s.send(indata)
3526 outdata = recv_meth(*args)
3527 if outdata != indata.lower():
3528 self.fail(
3529 "While receiving with <<{name:s}>> bad data "
3530 "<<{outdata:r}>> ({nout:d}) received; "
3531 "expected <<{indata:r}>> ({nin:d})\n".format(
3532 name=meth_name, outdata=outdata[:20],
3533 nout=len(outdata),
3534 indata=indata[:20], nin=len(indata)
3535 )
3536 )
3537 except ValueError as e:
3538 if expect_success:
3539 self.fail(
3540 "Failed to receive with method <<{name:s}>>; "
3541 "expected to succeed.\n".format(name=meth_name)
3542 )
3543 if not str(e).startswith(meth_name):
3544 self.fail(
3545 "Method <<{name:s}>> failed with unexpected "
3546 "exception message: {exp:s}\n".format(
3547 name=meth_name, exp=e
3548 )
3549 )
3550 # consume data
3551 s.read()
3552
3553 # read(-1, buffer) is supported, even though read(-1) is not
3554 data = b"data"
3555 s.send(data)
3556 buffer = bytearray(len(data))
3557 self.assertEqual(s.read(-1, buffer), len(data))
3558 self.assertEqual(buffer, data)
3559
Christian Heimes888bbdc2017-09-07 14:18:21 -07003560 # sendall accepts bytes-like objects
3561 if ctypes is not None:
3562 ubyte = ctypes.c_ubyte * len(data)
3563 byteslike = ubyte.from_buffer_copy(data)
3564 s.sendall(byteslike)
3565 self.assertEqual(s.read(), data)
3566
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003567 # Make sure sendmsg et al are disallowed to avoid
3568 # inadvertent disclosure of data and/or corruption
3569 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003570 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003571 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3572 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3573 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003574 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003575 s.write(b"over\n")
3576
3577 self.assertRaises(ValueError, s.recv, -1)
3578 self.assertRaises(ValueError, s.read, -1)
3579
3580 s.close()
3581
3582 def test_recv_zero(self):
3583 server = ThreadedEchoServer(CERTFILE)
3584 server.__enter__()
3585 self.addCleanup(server.__exit__, None, None)
3586 s = socket.create_connection((HOST, server.port))
3587 self.addCleanup(s.close)
3588 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3589 self.addCleanup(s.close)
3590
3591 # recv/read(0) should return no data
3592 s.send(b"data")
3593 self.assertEqual(s.recv(0), b"")
3594 self.assertEqual(s.read(0), b"")
3595 self.assertEqual(s.read(), b"data")
3596
3597 # Should not block if the other end sends no data
3598 s.setblocking(False)
3599 self.assertEqual(s.recv(0), b"")
3600 self.assertEqual(s.recv_into(bytearray()), 0)
3601
3602 def test_nonblocking_send(self):
3603 server = ThreadedEchoServer(CERTFILE,
3604 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003605 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003606 cacerts=CERTFILE,
3607 chatty=True,
3608 connectionchatty=False)
3609 with server:
3610 s = test_wrap_socket(socket.socket(),
3611 server_side=False,
3612 certfile=CERTFILE,
3613 ca_certs=CERTFILE,
3614 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003615 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003616 s.connect((HOST, server.port))
3617 s.setblocking(False)
3618
3619 # If we keep sending data, at some point the buffers
3620 # will be full and the call will block
3621 buf = bytearray(8192)
3622 def fill_buffer():
3623 while True:
3624 s.send(buf)
3625 self.assertRaises((ssl.SSLWantWriteError,
3626 ssl.SSLWantReadError), fill_buffer)
3627
3628 # Now read all the output and discard it
3629 s.setblocking(True)
3630 s.close()
3631
3632 def test_handshake_timeout(self):
3633 # Issue #5103: SSL handshake must respect the socket timeout
3634 server = socket.socket(socket.AF_INET)
3635 host = "127.0.0.1"
3636 port = support.bind_port(server)
3637 started = threading.Event()
3638 finish = False
3639
3640 def serve():
3641 server.listen()
3642 started.set()
3643 conns = []
3644 while not finish:
3645 r, w, e = select.select([server], [], [], 0.1)
3646 if server in r:
3647 # Let the socket hang around rather than having
3648 # it closed by garbage collection.
3649 conns.append(server.accept()[0])
3650 for sock in conns:
3651 sock.close()
3652
3653 t = threading.Thread(target=serve)
3654 t.start()
3655 started.wait()
3656
3657 try:
3658 try:
3659 c = socket.socket(socket.AF_INET)
3660 c.settimeout(0.2)
3661 c.connect((host, port))
3662 # Will attempt handshake and time out
3663 self.assertRaisesRegex(socket.timeout, "timed out",
3664 test_wrap_socket, c)
3665 finally:
3666 c.close()
3667 try:
3668 c = socket.socket(socket.AF_INET)
3669 c = test_wrap_socket(c)
3670 c.settimeout(0.2)
3671 # Will attempt handshake and time out
3672 self.assertRaisesRegex(socket.timeout, "timed out",
3673 c.connect, (host, port))
3674 finally:
3675 c.close()
3676 finally:
3677 finish = True
3678 t.join()
3679 server.close()
3680
3681 def test_server_accept(self):
3682 # Issue #16357: accept() on a SSLSocket created through
3683 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003684 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003685 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003686 context.load_verify_locations(SIGNING_CA)
3687 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003688 server = socket.socket(socket.AF_INET)
3689 host = "127.0.0.1"
3690 port = support.bind_port(server)
3691 server = context.wrap_socket(server, server_side=True)
3692 self.assertTrue(server.server_side)
3693
3694 evt = threading.Event()
3695 remote = None
3696 peer = None
3697 def serve():
3698 nonlocal remote, peer
3699 server.listen()
3700 # Block on the accept and wait on the connection to close.
3701 evt.set()
3702 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003703 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003704
3705 t = threading.Thread(target=serve)
3706 t.start()
3707 # Client wait until server setup and perform a connect.
3708 evt.wait()
3709 client = context.wrap_socket(socket.socket())
3710 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003711 client.send(b'data')
3712 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003713 client_addr = client.getsockname()
3714 client.close()
3715 t.join()
3716 remote.close()
3717 server.close()
3718 # Sanity checks.
3719 self.assertIsInstance(remote, ssl.SSLSocket)
3720 self.assertEqual(peer, client_addr)
3721
3722 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003723 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003724 with context.wrap_socket(socket.socket()) as sock:
3725 with self.assertRaises(OSError) as cm:
3726 sock.getpeercert()
3727 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3728
3729 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003730 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003731 with context.wrap_socket(socket.socket()) as sock:
3732 with self.assertRaises(OSError) as cm:
3733 sock.do_handshake()
3734 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3735
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003736 def test_no_shared_ciphers(self):
3737 client_context, server_context, hostname = testing_context()
3738 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3739 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003740 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003741 client_context.set_ciphers("AES128")
3742 server_context.set_ciphers("AES256")
3743 with ThreadedEchoServer(context=server_context) as server:
3744 with client_context.wrap_socket(socket.socket(),
3745 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003746 with self.assertRaises(OSError):
3747 s.connect((HOST, server.port))
3748 self.assertIn("no shared cipher", server.conn_errors[0])
3749
3750 def test_version_basic(self):
3751 """
3752 Basic tests for SSLSocket.version().
3753 More tests are done in the test_protocol_*() methods.
3754 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003755 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3756 context.check_hostname = False
3757 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003758 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003759 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003760 chatty=False) as server:
3761 with context.wrap_socket(socket.socket()) as s:
3762 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003763 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003764 s.connect((HOST, server.port))
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003765 if IS_OPENSSL_1_1_1 and has_tls_version('TLSv1_3'):
Christian Heimes05d9fe32018-02-27 08:55:39 +01003766 self.assertEqual(s.version(), 'TLSv1.3')
3767 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003768 self.assertEqual(s.version(), 'TLSv1.2')
3769 else: # 0.9.8 to 1.0.1
3770 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003771 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003772 self.assertIs(s.version(), None)
3773
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003774 @requires_tls_version('TLSv1_3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003775 def test_tls1_3(self):
3776 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3777 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003778 context.options |= (
3779 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3780 )
3781 with ThreadedEchoServer(context=context) as server:
3782 with context.wrap_socket(socket.socket()) as s:
3783 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003784 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003785 'TLS_AES_256_GCM_SHA384',
3786 'TLS_CHACHA20_POLY1305_SHA256',
3787 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003788 })
3789 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003790
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003791 @requires_minimum_version
3792 @requires_tls_version('TLSv1_2')
3793 def test_min_max_version_tlsv1_2(self):
Christian Heimes698dde12018-02-27 11:54:43 +01003794 client_context, server_context, hostname = testing_context()
3795 # client TLSv1.0 to 1.2
3796 client_context.minimum_version = ssl.TLSVersion.TLSv1
3797 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3798 # server only TLSv1.2
3799 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3800 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3801
3802 with ThreadedEchoServer(context=server_context) as server:
3803 with client_context.wrap_socket(socket.socket(),
3804 server_hostname=hostname) as s:
3805 s.connect((HOST, server.port))
3806 self.assertEqual(s.version(), 'TLSv1.2')
3807
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003808 @requires_minimum_version
3809 @requires_tls_version('TLSv1_1')
3810 def test_min_max_version_tlsv1_1(self):
3811 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003812 # client 1.0 to 1.2, server 1.0 to 1.1
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003813 client_context.minimum_version = ssl.TLSVersion.TLSv1
3814 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes698dde12018-02-27 11:54:43 +01003815 server_context.minimum_version = ssl.TLSVersion.TLSv1
3816 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3817
3818 with ThreadedEchoServer(context=server_context) as server:
3819 with client_context.wrap_socket(socket.socket(),
3820 server_hostname=hostname) as s:
3821 s.connect((HOST, server.port))
3822 self.assertEqual(s.version(), 'TLSv1.1')
3823
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003824 @requires_minimum_version
3825 @requires_tls_version('TLSv1_2')
3826 def test_min_max_version_mismatch(self):
3827 client_context, server_context, hostname = testing_context()
Christian Heimes698dde12018-02-27 11:54:43 +01003828 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003829 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimesc9bc49c2019-09-11 19:24:47 +02003830 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003831 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimesde606ea2019-09-11 19:48:58 +02003832 client_context.minimum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003833 with ThreadedEchoServer(context=server_context) as server:
3834 with client_context.wrap_socket(socket.socket(),
3835 server_hostname=hostname) as s:
3836 with self.assertRaises(ssl.SSLError) as e:
3837 s.connect((HOST, server.port))
3838 self.assertIn("alert", str(e.exception))
3839
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02003840 @requires_minimum_version
3841 @requires_tls_version('SSLv3')
Christian Heimes698dde12018-02-27 11:54:43 +01003842 def test_min_max_version_sslv3(self):
3843 client_context, server_context, hostname = testing_context()
3844 server_context.minimum_version = ssl.TLSVersion.SSLv3
3845 client_context.minimum_version = ssl.TLSVersion.SSLv3
3846 client_context.maximum_version = ssl.TLSVersion.SSLv3
3847 with ThreadedEchoServer(context=server_context) as server:
3848 with client_context.wrap_socket(socket.socket(),
3849 server_hostname=hostname) as s:
3850 s.connect((HOST, server.port))
3851 self.assertEqual(s.version(), 'SSLv3')
3852
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003853 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3854 def test_default_ecdh_curve(self):
3855 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3856 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003857 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003858 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003859 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3860 # cipher name.
3861 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003862 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3863 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3864 # our default cipher list should prefer ECDH-based ciphers
3865 # automatically.
3866 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3867 context.set_ciphers("ECCdraft:ECDH")
3868 with ThreadedEchoServer(context=context) as server:
3869 with context.wrap_socket(socket.socket()) as s:
3870 s.connect((HOST, server.port))
3871 self.assertIn("ECDH", s.cipher()[0])
3872
3873 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3874 "'tls-unique' channel binding not available")
3875 def test_tls_unique_channel_binding(self):
3876 """Test tls-unique channel binding."""
3877 if support.verbose:
3878 sys.stdout.write("\n")
3879
Christian Heimes05d9fe32018-02-27 08:55:39 +01003880 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003881
3882 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003883 chatty=True,
3884 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003885
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003886 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003887 with client_context.wrap_socket(
3888 socket.socket(),
3889 server_hostname=hostname) as s:
3890 s.connect((HOST, server.port))
3891 # get the data
3892 cb_data = s.get_channel_binding("tls-unique")
3893 if support.verbose:
3894 sys.stdout.write(
3895 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003896
Christian Heimes05d9fe32018-02-27 08:55:39 +01003897 # check if it is sane
3898 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003899 if s.version() == 'TLSv1.3':
3900 self.assertEqual(len(cb_data), 48)
3901 else:
3902 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003903
Christian Heimes05d9fe32018-02-27 08:55:39 +01003904 # and compare with the peers version
3905 s.write(b"CB tls-unique\n")
3906 peer_data_repr = s.read().strip()
3907 self.assertEqual(peer_data_repr,
3908 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003909
3910 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003911 with client_context.wrap_socket(
3912 socket.socket(),
3913 server_hostname=hostname) as s:
3914 s.connect((HOST, server.port))
3915 new_cb_data = s.get_channel_binding("tls-unique")
3916 if support.verbose:
3917 sys.stdout.write(
3918 "got another channel binding data: {0!r}\n".format(
3919 new_cb_data)
3920 )
3921 # is it really unique
3922 self.assertNotEqual(cb_data, new_cb_data)
3923 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003924 if s.version() == 'TLSv1.3':
3925 self.assertEqual(len(cb_data), 48)
3926 else:
3927 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003928 s.write(b"CB tls-unique\n")
3929 peer_data_repr = s.read().strip()
3930 self.assertEqual(peer_data_repr,
3931 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003932
3933 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003934 client_context, server_context, hostname = testing_context()
3935 stats = server_params_test(client_context, server_context,
3936 chatty=True, connectionchatty=True,
3937 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003938 if support.verbose:
3939 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3940 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3941
3942 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3943 "ssl.OP_NO_COMPRESSION needed for this test")
3944 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003945 client_context, server_context, hostname = testing_context()
3946 client_context.options |= ssl.OP_NO_COMPRESSION
3947 server_context.options |= ssl.OP_NO_COMPRESSION
3948 stats = server_params_test(client_context, server_context,
3949 chatty=True, connectionchatty=True,
3950 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003951 self.assertIs(stats['compression'], None)
3952
Paul Monsonf3550692019-06-19 13:09:54 -07003953 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003954 def test_dh_params(self):
3955 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003956 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003957 # test scenario needs TLS <= 1.2
3958 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003959 server_context.load_dh_params(DHFILE)
3960 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003961 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003962 stats = server_params_test(client_context, server_context,
3963 chatty=True, connectionchatty=True,
3964 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003965 cipher = stats["cipher"][0]
3966 parts = cipher.split("-")
3967 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3968 self.fail("Non-DH cipher: " + cipher[0])
3969
Christian Heimesb7b92252018-02-25 09:49:31 +01003970 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003971 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003972 def test_ecdh_curve(self):
3973 # server secp384r1, client auto
3974 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003975
Christian Heimesb7b92252018-02-25 09:49:31 +01003976 server_context.set_ecdh_curve("secp384r1")
3977 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3978 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3979 stats = server_params_test(client_context, server_context,
3980 chatty=True, connectionchatty=True,
3981 sni_name=hostname)
3982
3983 # server auto, client secp384r1
3984 client_context, server_context, hostname = testing_context()
3985 client_context.set_ecdh_curve("secp384r1")
3986 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3987 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3988 stats = server_params_test(client_context, server_context,
3989 chatty=True, connectionchatty=True,
3990 sni_name=hostname)
3991
3992 # server / client curve mismatch
3993 client_context, server_context, hostname = testing_context()
3994 client_context.set_ecdh_curve("prime256v1")
3995 server_context.set_ecdh_curve("secp384r1")
3996 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3997 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3998 try:
3999 stats = server_params_test(client_context, server_context,
4000 chatty=True, connectionchatty=True,
4001 sni_name=hostname)
4002 except ssl.SSLError:
4003 pass
4004 else:
4005 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01004006 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01004007 self.fail("mismatch curve did not fail")
4008
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004009 def test_selected_alpn_protocol(self):
4010 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02004011 client_context, server_context, hostname = testing_context()
4012 stats = server_params_test(client_context, server_context,
4013 chatty=True, connectionchatty=True,
4014 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004015 self.assertIs(stats['client_alpn_protocol'], None)
4016
4017 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
4018 def test_selected_alpn_protocol_if_server_uses_alpn(self):
4019 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02004020 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004021 server_context.set_alpn_protocols(['foo', 'bar'])
4022 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004023 chatty=True, connectionchatty=True,
4024 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004025 self.assertIs(stats['client_alpn_protocol'], None)
4026
4027 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
4028 def test_alpn_protocols(self):
4029 server_protocols = ['foo', 'bar', 'milkshake']
4030 protocol_tests = [
4031 (['foo', 'bar'], 'foo'),
4032 (['bar', 'foo'], 'foo'),
4033 (['milkshake'], 'milkshake'),
4034 (['http/3.0', 'http/4.0'], None)
4035 ]
4036 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004037 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004038 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004039 client_context.set_alpn_protocols(client_protocols)
4040
4041 try:
4042 stats = server_params_test(client_context,
4043 server_context,
4044 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02004045 connectionchatty=True,
4046 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004047 except ssl.SSLError as e:
4048 stats = e
4049
Christian Heimes05d9fe32018-02-27 08:55:39 +01004050 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004051 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
4052 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
4053 self.assertIsInstance(stats, ssl.SSLError)
4054 else:
4055 msg = "failed trying %s (s) and %s (c).\n" \
4056 "was expecting %s, but got %%s from the %%s" \
4057 % (str(server_protocols), str(client_protocols),
4058 str(expected))
4059 client_result = stats['client_alpn_protocol']
4060 self.assertEqual(client_result, expected,
4061 msg % (client_result, "client"))
4062 server_result = stats['server_alpn_protocols'][-1] \
4063 if len(stats['server_alpn_protocols']) else 'nothing'
4064 self.assertEqual(server_result, expected,
4065 msg % (server_result, "server"))
4066
4067 def test_selected_npn_protocol(self):
4068 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02004069 client_context, server_context, hostname = testing_context()
4070 stats = server_params_test(client_context, server_context,
4071 chatty=True, connectionchatty=True,
4072 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004073 self.assertIs(stats['client_npn_protocol'], None)
4074
4075 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
4076 def test_npn_protocols(self):
4077 server_protocols = ['http/1.1', 'spdy/2']
4078 protocol_tests = [
4079 (['http/1.1', 'spdy/2'], 'http/1.1'),
4080 (['spdy/2', 'http/1.1'], 'http/1.1'),
4081 (['spdy/2', 'test'], 'spdy/2'),
4082 (['abc', 'def'], 'abc')
4083 ]
4084 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02004085 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004086 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004087 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004088 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02004089 chatty=True, connectionchatty=True,
4090 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004091 msg = "failed trying %s (s) and %s (c).\n" \
4092 "was expecting %s, but got %%s from the %%s" \
4093 % (str(server_protocols), str(client_protocols),
4094 str(expected))
4095 client_result = stats['client_npn_protocol']
4096 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4097 server_result = stats['server_npn_protocols'][-1] \
4098 if len(stats['server_npn_protocols']) else 'nothing'
4099 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4100
4101 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004102 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004103 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004104 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004105 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004106 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004107 client_context.load_verify_locations(SIGNING_CA)
4108 return server_context, other_context, client_context
4109
4110 def check_common_name(self, stats, name):
4111 cert = stats['peercert']
4112 self.assertIn((('commonName', name),), cert['subject'])
4113
4114 @needs_sni
4115 def test_sni_callback(self):
4116 calls = []
4117 server_context, other_context, client_context = self.sni_contexts()
4118
Christian Heimesa170fa12017-09-15 20:27:30 +02004119 client_context.check_hostname = False
4120
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004121 def servername_cb(ssl_sock, server_name, initial_context):
4122 calls.append((server_name, initial_context))
4123 if server_name is not None:
4124 ssl_sock.context = other_context
4125 server_context.set_servername_callback(servername_cb)
4126
4127 stats = server_params_test(client_context, server_context,
4128 chatty=True,
4129 sni_name='supermessage')
4130 # The hostname was fetched properly, and the certificate was
4131 # changed for the connection.
4132 self.assertEqual(calls, [("supermessage", server_context)])
4133 # CERTFILE4 was selected
4134 self.check_common_name(stats, 'fakehostname')
4135
4136 calls = []
4137 # The callback is called with server_name=None
4138 stats = server_params_test(client_context, server_context,
4139 chatty=True,
4140 sni_name=None)
4141 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004142 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004143
4144 # Check disabling the callback
4145 calls = []
4146 server_context.set_servername_callback(None)
4147
4148 stats = server_params_test(client_context, server_context,
4149 chatty=True,
4150 sni_name='notfunny')
4151 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004152 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004153 self.assertEqual(calls, [])
4154
4155 @needs_sni
4156 def test_sni_callback_alert(self):
4157 # Returning a TLS alert is reflected to the connecting client
4158 server_context, other_context, client_context = self.sni_contexts()
4159
4160 def cb_returning_alert(ssl_sock, server_name, initial_context):
4161 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4162 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004163 with self.assertRaises(ssl.SSLError) as cm:
4164 stats = server_params_test(client_context, server_context,
4165 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004166 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004167 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004169 @needs_sni
4170 def test_sni_callback_raising(self):
4171 # Raising fails the connection with a TLS handshake failure alert.
4172 server_context, other_context, client_context = self.sni_contexts()
4173
4174 def cb_raising(ssl_sock, server_name, initial_context):
4175 1/0
4176 server_context.set_servername_callback(cb_raising)
4177
Victor Stinner00253502019-06-03 03:51:43 +02004178 with support.catch_unraisable_exception() as catch:
4179 with self.assertRaises(ssl.SSLError) as cm:
4180 stats = server_params_test(client_context, server_context,
4181 chatty=False,
4182 sni_name='supermessage')
4183
4184 self.assertEqual(cm.exception.reason,
4185 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4186 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004187
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004188 @needs_sni
4189 def test_sni_callback_wrong_return_type(self):
4190 # Returning the wrong return type terminates the TLS connection
4191 # with an internal error alert.
4192 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004193
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004194 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4195 return "foo"
4196 server_context.set_servername_callback(cb_wrong_return_type)
4197
Victor Stinner00253502019-06-03 03:51:43 +02004198 with support.catch_unraisable_exception() as catch:
4199 with self.assertRaises(ssl.SSLError) as cm:
4200 stats = server_params_test(client_context, server_context,
4201 chatty=False,
4202 sni_name='supermessage')
4203
4204
4205 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4206 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004207
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004208 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004209 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004210 client_context.set_ciphers("AES128:AES256")
4211 server_context.set_ciphers("AES256")
4212 expected_algs = [
4213 "AES256", "AES-256",
4214 # TLS 1.3 ciphers are always enabled
4215 "TLS_CHACHA20", "TLS_AES",
4216 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004217
Christian Heimesa170fa12017-09-15 20:27:30 +02004218 stats = server_params_test(client_context, server_context,
4219 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004220 ciphers = stats['server_shared_ciphers'][0]
4221 self.assertGreater(len(ciphers), 0)
4222 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004223 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004224 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004225
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004226 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004227 client_context, server_context, hostname = testing_context()
4228 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004229
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004230 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004231 s = client_context.wrap_socket(socket.socket(),
4232 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004233 s.connect((HOST, server.port))
4234 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004235
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004236 self.assertRaises(ValueError, s.read, 1024)
4237 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004238
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004239 def test_sendfile(self):
4240 TEST_DATA = b"x" * 512
4241 with open(support.TESTFN, 'wb') as f:
4242 f.write(TEST_DATA)
4243 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004244 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004245 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004246 context.load_verify_locations(SIGNING_CA)
4247 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004248 server = ThreadedEchoServer(context=context, chatty=False)
4249 with server:
4250 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004251 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004252 with open(support.TESTFN, 'rb') as file:
4253 s.sendfile(file)
4254 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004255
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004256 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004257 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004258 # TODO: sessions aren't compatible with TLSv1.3 yet
4259 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004260
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004261 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004262 stats = server_params_test(client_context, server_context,
4263 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004264 session = stats['session']
4265 self.assertTrue(session.id)
4266 self.assertGreater(session.time, 0)
4267 self.assertGreater(session.timeout, 0)
4268 self.assertTrue(session.has_ticket)
4269 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4270 self.assertGreater(session.ticket_lifetime_hint, 0)
4271 self.assertFalse(stats['session_reused'])
4272 sess_stat = server_context.session_stats()
4273 self.assertEqual(sess_stat['accept'], 1)
4274 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004275
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004276 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004277 stats = server_params_test(client_context, server_context,
4278 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004279 sess_stat = server_context.session_stats()
4280 self.assertEqual(sess_stat['accept'], 2)
4281 self.assertEqual(sess_stat['hits'], 1)
4282 self.assertTrue(stats['session_reused'])
4283 session2 = stats['session']
4284 self.assertEqual(session2.id, session.id)
4285 self.assertEqual(session2, session)
4286 self.assertIsNot(session2, session)
4287 self.assertGreaterEqual(session2.time, session.time)
4288 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004289
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004290 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004291 stats = server_params_test(client_context, server_context,
4292 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004293 self.assertFalse(stats['session_reused'])
4294 session3 = stats['session']
4295 self.assertNotEqual(session3.id, session.id)
4296 self.assertNotEqual(session3, session)
4297 sess_stat = server_context.session_stats()
4298 self.assertEqual(sess_stat['accept'], 3)
4299 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004300
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004301 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004302 stats = server_params_test(client_context, server_context,
4303 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004304 self.assertTrue(stats['session_reused'])
4305 session4 = stats['session']
4306 self.assertEqual(session4.id, session.id)
4307 self.assertEqual(session4, session)
4308 self.assertGreaterEqual(session4.time, session.time)
4309 self.assertGreaterEqual(session4.timeout, session.timeout)
4310 sess_stat = server_context.session_stats()
4311 self.assertEqual(sess_stat['accept'], 4)
4312 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004313
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004314 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004315 client_context, server_context, hostname = testing_context()
4316 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004317
Christian Heimes05d9fe32018-02-27 08:55:39 +01004318 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004319 client_context.options |= ssl.OP_NO_TLSv1_3
4320 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004321
Christian Heimesa170fa12017-09-15 20:27:30 +02004322 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004323 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004324 with client_context.wrap_socket(socket.socket(),
4325 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004326 # session is None before handshake
4327 self.assertEqual(s.session, None)
4328 self.assertEqual(s.session_reused, None)
4329 s.connect((HOST, server.port))
4330 session = s.session
4331 self.assertTrue(session)
4332 with self.assertRaises(TypeError) as e:
4333 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004334 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004335
Christian Heimesa170fa12017-09-15 20:27:30 +02004336 with client_context.wrap_socket(socket.socket(),
4337 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004338 s.connect((HOST, server.port))
4339 # cannot set session after handshake
4340 with self.assertRaises(ValueError) as e:
4341 s.session = session
4342 self.assertEqual(str(e.exception),
4343 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004344
Christian Heimesa170fa12017-09-15 20:27:30 +02004345 with client_context.wrap_socket(socket.socket(),
4346 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004347 # can set session before handshake and before the
4348 # connection was established
4349 s.session = session
4350 s.connect((HOST, server.port))
4351 self.assertEqual(s.session.id, session.id)
4352 self.assertEqual(s.session, session)
4353 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004354
Christian Heimesa170fa12017-09-15 20:27:30 +02004355 with client_context2.wrap_socket(socket.socket(),
4356 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004357 # cannot re-use session with a different SSLContext
4358 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004359 s.session = session
4360 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004361 self.assertEqual(str(e.exception),
4362 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004363
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004364
Christian Heimesdf6ac7e2019-09-26 17:02:59 +02004365@unittest.skipUnless(has_tls_version('TLSv1_3'), "Test needs TLS 1.3")
Christian Heimes9fb051f2018-09-23 08:32:31 +02004366class TestPostHandshakeAuth(unittest.TestCase):
4367 def test_pha_setter(self):
4368 protocols = [
4369 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4370 ]
4371 for protocol in protocols:
4372 ctx = ssl.SSLContext(protocol)
4373 self.assertEqual(ctx.post_handshake_auth, False)
4374
4375 ctx.post_handshake_auth = True
4376 self.assertEqual(ctx.post_handshake_auth, True)
4377
4378 ctx.verify_mode = ssl.CERT_REQUIRED
4379 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4380 self.assertEqual(ctx.post_handshake_auth, True)
4381
4382 ctx.post_handshake_auth = False
4383 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4384 self.assertEqual(ctx.post_handshake_auth, False)
4385
4386 ctx.verify_mode = ssl.CERT_OPTIONAL
4387 ctx.post_handshake_auth = True
4388 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4389 self.assertEqual(ctx.post_handshake_auth, True)
4390
4391 def test_pha_required(self):
4392 client_context, server_context, hostname = testing_context()
4393 server_context.post_handshake_auth = True
4394 server_context.verify_mode = ssl.CERT_REQUIRED
4395 client_context.post_handshake_auth = True
4396 client_context.load_cert_chain(SIGNED_CERTFILE)
4397
4398 server = ThreadedEchoServer(context=server_context, chatty=False)
4399 with server:
4400 with client_context.wrap_socket(socket.socket(),
4401 server_hostname=hostname) as s:
4402 s.connect((HOST, server.port))
4403 s.write(b'HASCERT')
4404 self.assertEqual(s.recv(1024), b'FALSE\n')
4405 s.write(b'PHA')
4406 self.assertEqual(s.recv(1024), b'OK\n')
4407 s.write(b'HASCERT')
4408 self.assertEqual(s.recv(1024), b'TRUE\n')
4409 # PHA method just returns true when cert is already available
4410 s.write(b'PHA')
4411 self.assertEqual(s.recv(1024), b'OK\n')
4412 s.write(b'GETCERT')
4413 cert_text = s.recv(4096).decode('us-ascii')
4414 self.assertIn('Python Software Foundation CA', cert_text)
4415
4416 def test_pha_required_nocert(self):
4417 client_context, server_context, hostname = testing_context()
4418 server_context.post_handshake_auth = True
4419 server_context.verify_mode = ssl.CERT_REQUIRED
4420 client_context.post_handshake_auth = True
4421
Victor Stinner73ea5462019-07-09 14:33:49 +02004422 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4423 # (it is only raised sometimes on Windows)
4424 with support.catch_threading_exception() as cm:
4425 server = ThreadedEchoServer(context=server_context, chatty=False)
4426 with server:
4427 with client_context.wrap_socket(socket.socket(),
4428 server_hostname=hostname) as s:
4429 s.connect((HOST, server.port))
4430 s.write(b'PHA')
4431 # receive CertificateRequest
4432 self.assertEqual(s.recv(1024), b'OK\n')
4433 # send empty Certificate + Finish
4434 s.write(b'HASCERT')
4435 # receive alert
4436 with self.assertRaisesRegex(
4437 ssl.SSLError,
4438 'tlsv13 alert certificate required'):
4439 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004440
4441 def test_pha_optional(self):
4442 if support.verbose:
4443 sys.stdout.write("\n")
4444
4445 client_context, server_context, hostname = testing_context()
4446 server_context.post_handshake_auth = True
4447 server_context.verify_mode = ssl.CERT_REQUIRED
4448 client_context.post_handshake_auth = True
4449 client_context.load_cert_chain(SIGNED_CERTFILE)
4450
4451 # check CERT_OPTIONAL
4452 server_context.verify_mode = ssl.CERT_OPTIONAL
4453 server = ThreadedEchoServer(context=server_context, chatty=False)
4454 with server:
4455 with client_context.wrap_socket(socket.socket(),
4456 server_hostname=hostname) as s:
4457 s.connect((HOST, server.port))
4458 s.write(b'HASCERT')
4459 self.assertEqual(s.recv(1024), b'FALSE\n')
4460 s.write(b'PHA')
4461 self.assertEqual(s.recv(1024), b'OK\n')
4462 s.write(b'HASCERT')
4463 self.assertEqual(s.recv(1024), b'TRUE\n')
4464
4465 def test_pha_optional_nocert(self):
4466 if support.verbose:
4467 sys.stdout.write("\n")
4468
4469 client_context, server_context, hostname = testing_context()
4470 server_context.post_handshake_auth = True
4471 server_context.verify_mode = ssl.CERT_OPTIONAL
4472 client_context.post_handshake_auth = True
4473
4474 server = ThreadedEchoServer(context=server_context, chatty=False)
4475 with server:
4476 with client_context.wrap_socket(socket.socket(),
4477 server_hostname=hostname) as s:
4478 s.connect((HOST, server.port))
4479 s.write(b'HASCERT')
4480 self.assertEqual(s.recv(1024), b'FALSE\n')
4481 s.write(b'PHA')
4482 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004483 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004484 s.write(b'HASCERT')
4485 self.assertEqual(s.recv(1024), b'FALSE\n')
4486
4487 def test_pha_no_pha_client(self):
4488 client_context, server_context, hostname = testing_context()
4489 server_context.post_handshake_auth = True
4490 server_context.verify_mode = ssl.CERT_REQUIRED
4491 client_context.load_cert_chain(SIGNED_CERTFILE)
4492
4493 server = ThreadedEchoServer(context=server_context, chatty=False)
4494 with server:
4495 with client_context.wrap_socket(socket.socket(),
4496 server_hostname=hostname) as s:
4497 s.connect((HOST, server.port))
4498 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4499 s.verify_client_post_handshake()
4500 s.write(b'PHA')
4501 self.assertIn(b'extension not received', s.recv(1024))
4502
4503 def test_pha_no_pha_server(self):
4504 # server doesn't have PHA enabled, cert is requested in handshake
4505 client_context, server_context, hostname = testing_context()
4506 server_context.verify_mode = ssl.CERT_REQUIRED
4507 client_context.post_handshake_auth = True
4508 client_context.load_cert_chain(SIGNED_CERTFILE)
4509
4510 server = ThreadedEchoServer(context=server_context, chatty=False)
4511 with server:
4512 with client_context.wrap_socket(socket.socket(),
4513 server_hostname=hostname) as s:
4514 s.connect((HOST, server.port))
4515 s.write(b'HASCERT')
4516 self.assertEqual(s.recv(1024), b'TRUE\n')
4517 # PHA doesn't fail if there is already a cert
4518 s.write(b'PHA')
4519 self.assertEqual(s.recv(1024), b'OK\n')
4520 s.write(b'HASCERT')
4521 self.assertEqual(s.recv(1024), b'TRUE\n')
4522
4523 def test_pha_not_tls13(self):
4524 # TLS 1.2
4525 client_context, server_context, hostname = testing_context()
4526 server_context.verify_mode = ssl.CERT_REQUIRED
4527 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4528 client_context.post_handshake_auth = True
4529 client_context.load_cert_chain(SIGNED_CERTFILE)
4530
4531 server = ThreadedEchoServer(context=server_context, chatty=False)
4532 with server:
4533 with client_context.wrap_socket(socket.socket(),
4534 server_hostname=hostname) as s:
4535 s.connect((HOST, server.port))
4536 # PHA fails for TLS != 1.3
4537 s.write(b'PHA')
4538 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4539
Christian Heimesf0f59302019-07-01 08:29:17 +02004540 def test_bpo37428_pha_cert_none(self):
4541 # verify that post_handshake_auth does not implicitly enable cert
4542 # validation.
4543 hostname = SIGNED_CERTFILE_HOSTNAME
4544 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4545 client_context.post_handshake_auth = True
4546 client_context.load_cert_chain(SIGNED_CERTFILE)
4547 # no cert validation and CA on client side
4548 client_context.check_hostname = False
4549 client_context.verify_mode = ssl.CERT_NONE
4550
4551 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4552 server_context.load_cert_chain(SIGNED_CERTFILE)
4553 server_context.load_verify_locations(SIGNING_CA)
4554 server_context.post_handshake_auth = True
4555 server_context.verify_mode = ssl.CERT_REQUIRED
4556
4557 server = ThreadedEchoServer(context=server_context, chatty=False)
4558 with server:
4559 with client_context.wrap_socket(socket.socket(),
4560 server_hostname=hostname) as s:
4561 s.connect((HOST, server.port))
4562 s.write(b'HASCERT')
4563 self.assertEqual(s.recv(1024), b'FALSE\n')
4564 s.write(b'PHA')
4565 self.assertEqual(s.recv(1024), b'OK\n')
4566 s.write(b'HASCERT')
4567 self.assertEqual(s.recv(1024), b'TRUE\n')
4568 # server cert has not been validated
4569 self.assertEqual(s.getpeercert(), {})
4570
Christian Heimes9fb051f2018-09-23 08:32:31 +02004571
Christian Heimesc7f70692019-05-31 11:44:05 +02004572HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4573requires_keylog = unittest.skipUnless(
4574 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4575
4576class TestSSLDebug(unittest.TestCase):
4577
4578 def keylog_lines(self, fname=support.TESTFN):
4579 with open(fname) as f:
4580 return len(list(f))
4581
4582 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004583 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004584 def test_keylog_defaults(self):
4585 self.addCleanup(support.unlink, support.TESTFN)
4586 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4587 self.assertEqual(ctx.keylog_filename, None)
4588
4589 self.assertFalse(os.path.isfile(support.TESTFN))
4590 ctx.keylog_filename = support.TESTFN
4591 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4592 self.assertTrue(os.path.isfile(support.TESTFN))
4593 self.assertEqual(self.keylog_lines(), 1)
4594
4595 ctx.keylog_filename = None
4596 self.assertEqual(ctx.keylog_filename, None)
4597
4598 with self.assertRaises((IsADirectoryError, PermissionError)):
4599 # Windows raises PermissionError
4600 ctx.keylog_filename = os.path.dirname(
4601 os.path.abspath(support.TESTFN))
4602
4603 with self.assertRaises(TypeError):
4604 ctx.keylog_filename = 1
4605
4606 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004607 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004608 def test_keylog_filename(self):
4609 self.addCleanup(support.unlink, support.TESTFN)
4610 client_context, server_context, hostname = testing_context()
4611
4612 client_context.keylog_filename = support.TESTFN
4613 server = ThreadedEchoServer(context=server_context, chatty=False)
4614 with server:
4615 with client_context.wrap_socket(socket.socket(),
4616 server_hostname=hostname) as s:
4617 s.connect((HOST, server.port))
4618 # header, 5 lines for TLS 1.3
4619 self.assertEqual(self.keylog_lines(), 6)
4620
4621 client_context.keylog_filename = None
4622 server_context.keylog_filename = support.TESTFN
4623 server = ThreadedEchoServer(context=server_context, chatty=False)
4624 with server:
4625 with client_context.wrap_socket(socket.socket(),
4626 server_hostname=hostname) as s:
4627 s.connect((HOST, server.port))
4628 self.assertGreaterEqual(self.keylog_lines(), 11)
4629
4630 client_context.keylog_filename = support.TESTFN
4631 server_context.keylog_filename = support.TESTFN
4632 server = ThreadedEchoServer(context=server_context, chatty=False)
4633 with server:
4634 with client_context.wrap_socket(socket.socket(),
4635 server_hostname=hostname) as s:
4636 s.connect((HOST, server.port))
4637 self.assertGreaterEqual(self.keylog_lines(), 21)
4638
4639 client_context.keylog_filename = None
4640 server_context.keylog_filename = None
4641
4642 @requires_keylog
4643 @unittest.skipIf(sys.flags.ignore_environment,
4644 "test is not compatible with ignore_environment")
Paul Monsonf3550692019-06-19 13:09:54 -07004645 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004646 def test_keylog_env(self):
4647 self.addCleanup(support.unlink, support.TESTFN)
4648 with unittest.mock.patch.dict(os.environ):
4649 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4650 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4651
4652 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4653 self.assertEqual(ctx.keylog_filename, None)
4654
4655 ctx = ssl.create_default_context()
4656 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4657
4658 ctx = ssl._create_stdlib_context()
4659 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4660
4661 def test_msg_callback(self):
4662 client_context, server_context, hostname = testing_context()
4663
4664 def msg_cb(conn, direction, version, content_type, msg_type, data):
4665 pass
4666
4667 self.assertIs(client_context._msg_callback, None)
4668 client_context._msg_callback = msg_cb
4669 self.assertIs(client_context._msg_callback, msg_cb)
4670 with self.assertRaises(TypeError):
4671 client_context._msg_callback = object()
4672
4673 def test_msg_callback_tls12(self):
4674 client_context, server_context, hostname = testing_context()
4675 client_context.options |= ssl.OP_NO_TLSv1_3
4676
4677 msg = []
4678
4679 def msg_cb(conn, direction, version, content_type, msg_type, data):
4680 self.assertIsInstance(conn, ssl.SSLSocket)
4681 self.assertIsInstance(data, bytes)
4682 self.assertIn(direction, {'read', 'write'})
4683 msg.append((direction, version, content_type, msg_type))
4684
4685 client_context._msg_callback = msg_cb
4686
4687 server = ThreadedEchoServer(context=server_context, chatty=False)
4688 with server:
4689 with client_context.wrap_socket(socket.socket(),
4690 server_hostname=hostname) as s:
4691 s.connect((HOST, server.port))
4692
Christian Heimese35d1ba2019-06-03 20:40:15 +02004693 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004694 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4695 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004696 msg
4697 )
4698 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004699 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4700 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004701 msg
4702 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004703
4704
Thomas Woutersed03b412007-08-28 21:37:11 +00004705def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004706 if support.verbose:
4707 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004708 'Mac': platform.mac_ver,
4709 'Windows': platform.win32_ver,
4710 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004711 for name, func in plats.items():
4712 plat = func()
4713 if plat and plat[0]:
4714 plat = '%s %r' % (name, plat)
4715 break
4716 else:
4717 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004718 print("test_ssl: testing with %r %r" %
4719 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4720 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004721 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004722 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4723 try:
4724 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4725 except AttributeError:
4726 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004727
Antoine Pitrou152efa22010-05-16 18:19:27 +00004728 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004729 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004730 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004731 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004732 BADCERT, BADKEY, EMPTYCERT]:
4733 if not os.path.exists(filename):
4734 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004735
Martin Panter3840b2a2016-03-27 01:53:46 +00004736 tests = [
4737 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004738 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004739 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004740 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004741
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004742 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004743 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004744
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004745 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004746 try:
4747 support.run_unittest(*tests)
4748 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004749 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004750
4751if __name__ == "__main__":
4752 test_main()