blob: d48d6e5569fc3e8f880c91ab5b142bb662a1628c [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010020import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070021try:
22 import ctypes
23except ImportError:
24 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000025
Antoine Pitrou05d936d2010-10-13 11:38:36 +000026ssl = support.import_module("ssl")
27
Martin Panter3840b2a2016-03-27 01:53:46 +000028
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010029PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000030HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020031IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010032IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
33IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Victor Stinner3ef63442019-02-19 18:06:03 +010036PROTOCOL_TO_TLS_VERSION = {}
37for proto, ver in (
38 ("PROTOCOL_SSLv23", "SSLv3"),
39 ("PROTOCOL_TLSv1", "TLSv1"),
40 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
41):
42 try:
43 proto = getattr(ssl, proto)
44 ver = getattr(ssl.TLSVersion, ver)
45 except AttributeError:
46 continue
47 PROTOCOL_TO_TLS_VERSION[proto] = ver
48
Christian Heimesefff7062013-11-21 03:35:02 +010049def data_file(*name):
50 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000051
Antoine Pitrou81564092010-10-08 23:06:24 +000052# The custom key and certificate files used in test_ssl are generated
53# using Lib/test/make_ssl_certs.py.
54# Other certificates are simply fetched from the Internet servers they
55# are meant to authenticate.
56
Antoine Pitrou152efa22010-05-16 18:19:27 +000057CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000058BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000059ONLYCERT = data_file("ssl_cert.pem")
60ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000061BYTES_ONLYCERT = os.fsencode(ONLYCERT)
62BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020063CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
64ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
65KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000066CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000067BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010068CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
69CAFILE_CACERT = data_file("capath", "5ed36f99.0")
70
Christian Heimesbd5c7d22018-01-20 15:16:30 +010071CERTFILE_INFO = {
72 'issuer': ((('countryName', 'XY'),),
73 (('localityName', 'Castle Anthrax'),),
74 (('organizationName', 'Python Software Foundation'),),
75 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020076 'notAfter': 'Aug 26 14:23:15 2028 GMT',
77 'notBefore': 'Aug 29 14:23:15 2018 GMT',
78 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010079 'subject': ((('countryName', 'XY'),),
80 (('localityName', 'Castle Anthrax'),),
81 (('organizationName', 'Python Software Foundation'),),
82 (('commonName', 'localhost'),)),
83 'subjectAltName': (('DNS', 'localhost'),),
84 'version': 3
85}
Antoine Pitrou152efa22010-05-16 18:19:27 +000086
Christian Heimes22587792013-11-21 23:56:13 +010087# empty CRL
88CRLFILE = data_file("revocation.crl")
89
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010090# Two keys and certs signed by the same CA (for SNI tests)
91SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020092SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010093
94SIGNED_CERTFILE_INFO = {
95 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
96 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
97 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
98 'issuer': ((('countryName', 'XY'),),
99 (('organizationName', 'Python Software Foundation CA'),),
100 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200101 'notAfter': 'Jul 7 14:23:16 2028 GMT',
102 'notBefore': 'Aug 29 14:23:16 2018 GMT',
103 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100104 'subject': ((('countryName', 'XY'),),
105 (('localityName', 'Castle Anthrax'),),
106 (('organizationName', 'Python Software Foundation'),),
107 (('commonName', 'localhost'),)),
108 'subjectAltName': (('DNS', 'localhost'),),
109 'version': 3
110}
111
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100112SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200113SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100114SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
115SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
116
Martin Panter3840b2a2016-03-27 01:53:46 +0000117# Same certificate as pycacert.pem, but without extra text in file
118SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200119# cert with all kinds of subject alt names
120ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100121IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100122
Martin Panter3d81d932016-01-14 09:36:00 +0000123REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000124
125EMPTYCERT = data_file("nullcert.pem")
126BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000127NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000128BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200129NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200130NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100131TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000132
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200133DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100134BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Christian Heimes358cfd42016-09-10 22:43:48 +0200136# Not defined in all versions of OpenSSL
137OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
138OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
139OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
140OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100141OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200142
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100143
Thomas Woutersed03b412007-08-28 21:37:11 +0000144def handle_error(prefix):
145 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000146 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000147 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000148
Antoine Pitroub5218772010-05-21 09:56:06 +0000149def can_clear_options():
150 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200151 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000152
153def no_sslv2_implies_sslv3_hello():
154 # 0.9.7h or higher
155 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
156
Christian Heimes2427b502013-11-23 11:24:32 +0100157def have_verify_flags():
158 # 0.9.8 or higher
159 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
160
Christian Heimesb7b92252018-02-25 09:49:31 +0100161def _have_secp_curves():
162 if not ssl.HAS_ECDH:
163 return False
164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
165 try:
166 ctx.set_ecdh_curve("secp384r1")
167 except ValueError:
168 return False
169 else:
170 return True
171
172
173HAVE_SECP_CURVES = _have_secp_curves()
174
175
Antoine Pitrouc695c952014-04-28 20:57:36 +0200176def utc_offset(): #NOTE: ignore issues like #1647654
177 # local time = utc time + utc offset
178 if time.daylight and time.localtime().tm_isdst > 0:
179 return -time.altzone # seconds
180 return -time.timezone
181
Christian Heimes9424bb42013-06-17 15:32:57 +0200182def asn1time(cert_time):
183 # Some versions of OpenSSL ignore seconds, see #18207
184 # 0.9.8.i
185 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
186 fmt = "%b %d %H:%M:%S %Y GMT"
187 dt = datetime.datetime.strptime(cert_time, fmt)
188 dt = dt.replace(second=0)
189 cert_time = dt.strftime(fmt)
190 # %d adds leading zero but ASN1_TIME_print() uses leading space
191 if cert_time[4] == "0":
192 cert_time = cert_time[:4] + " " + cert_time[5:]
193
194 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000195
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100196needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
197
Antoine Pitrou23df4832010-08-04 17:14:06 +0000198
Christian Heimesd0486372016-09-10 23:23:33 +0200199def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
200 cert_reqs=ssl.CERT_NONE, ca_certs=None,
201 ciphers=None, certfile=None, keyfile=None,
202 **kwargs):
203 context = ssl.SSLContext(ssl_version)
204 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200205 if cert_reqs == ssl.CERT_NONE:
206 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200207 context.verify_mode = cert_reqs
208 if ca_certs is not None:
209 context.load_verify_locations(ca_certs)
210 if certfile is not None or keyfile is not None:
211 context.load_cert_chain(certfile, keyfile)
212 if ciphers is not None:
213 context.set_ciphers(ciphers)
214 return context.wrap_socket(sock, **kwargs)
215
Christian Heimesa170fa12017-09-15 20:27:30 +0200216
217def testing_context(server_cert=SIGNED_CERTFILE):
218 """Create context
219
220 client_context, server_context, hostname = testing_context()
221 """
222 if server_cert == SIGNED_CERTFILE:
223 hostname = SIGNED_CERTFILE_HOSTNAME
224 elif server_cert == SIGNED_CERTFILE2:
225 hostname = SIGNED_CERTFILE2_HOSTNAME
226 else:
227 raise ValueError(server_cert)
228
229 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
230 client_context.load_verify_locations(SIGNING_CA)
231
232 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
233 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200234 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200235
236 return client_context, server_context, hostname
237
238
Antoine Pitrou152efa22010-05-16 18:19:27 +0000239class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000240
Antoine Pitrou480a1242010-04-28 21:37:09 +0000241 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000242 ssl.CERT_NONE
243 ssl.CERT_OPTIONAL
244 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100245 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100246 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100247 if ssl.HAS_ECDH:
248 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100249 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
250 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000251 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100252 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700253 ssl.OP_NO_SSLv2
254 ssl.OP_NO_SSLv3
255 ssl.OP_NO_TLSv1
256 ssl.OP_NO_TLSv1_3
257 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
258 ssl.OP_NO_TLSv1_1
259 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200260 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000261
Christian Heimes9d50ab52018-02-27 10:17:30 +0100262 def test_private_init(self):
263 with self.assertRaisesRegex(TypeError, "public constructor"):
264 with socket.socket() as s:
265 ssl.SSLSocket(s)
266
Antoine Pitrou172f0252014-04-18 20:33:08 +0200267 def test_str_for_enums(self):
268 # Make sure that the PROTOCOL_* constants have enum-like string
269 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200270 proto = ssl.PROTOCOL_TLS
271 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200272 ctx = ssl.SSLContext(proto)
273 self.assertIs(ctx.protocol, proto)
274
Antoine Pitrou480a1242010-04-28 21:37:09 +0000275 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000276 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000277 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 sys.stdout.write("\n RAND_status is %d (%s)\n"
279 % (v, (v and "sufficient randomness") or
280 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200281
282 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
283 self.assertEqual(len(data), 16)
284 self.assertEqual(is_cryptographic, v == 1)
285 if v:
286 data = ssl.RAND_bytes(16)
287 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200288 else:
289 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200290
Victor Stinner1e81a392013-12-19 16:47:04 +0100291 # negative num is invalid
292 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
293 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
294
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100295 if hasattr(ssl, 'RAND_egd'):
296 self.assertRaises(TypeError, ssl.RAND_egd, 1)
297 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000298 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200299 ssl.RAND_add(b"this is a random bytes object", 75.0)
300 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000301
Christian Heimesf77b4b22013-08-21 13:26:05 +0200302 @unittest.skipUnless(os.name == 'posix', 'requires posix')
303 def test_random_fork(self):
304 status = ssl.RAND_status()
305 if not status:
306 self.fail("OpenSSL's PRNG has insufficient randomness")
307
308 rfd, wfd = os.pipe()
309 pid = os.fork()
310 if pid == 0:
311 try:
312 os.close(rfd)
313 child_random = ssl.RAND_pseudo_bytes(16)[0]
314 self.assertEqual(len(child_random), 16)
315 os.write(wfd, child_random)
316 os.close(wfd)
317 except BaseException:
318 os._exit(1)
319 else:
320 os._exit(0)
321 else:
322 os.close(wfd)
323 self.addCleanup(os.close, rfd)
324 _, status = os.waitpid(pid, 0)
325 self.assertEqual(status, 0)
326
327 child_random = os.read(rfd, 16)
328 self.assertEqual(len(child_random), 16)
329 parent_random = ssl.RAND_pseudo_bytes(16)[0]
330 self.assertEqual(len(parent_random), 16)
331
332 self.assertNotEqual(child_random, parent_random)
333
Christian Heimese6dac002018-08-30 07:25:49 +0200334 maxDiff = None
335
Antoine Pitrou480a1242010-04-28 21:37:09 +0000336 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000337 # note that this uses an 'unofficial' function in _ssl.c,
338 # provided solely for this test, to exercise the certificate
339 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100340 self.assertEqual(
341 ssl._ssl._test_decode_cert(CERTFILE),
342 CERTFILE_INFO
343 )
344 self.assertEqual(
345 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
346 SIGNED_CERTFILE_INFO
347 )
348
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200349 # Issue #13034: the subjectAltName in some certificates
350 # (notably projects.developer.nokia.com:443) wasn't parsed
351 p = ssl._ssl._test_decode_cert(NOKIACERT)
352 if support.verbose:
353 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
354 self.assertEqual(p['subjectAltName'],
355 (('DNS', 'projects.developer.nokia.com'),
356 ('DNS', 'projects.forum.nokia.com'))
357 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100358 # extra OCSP and AIA fields
359 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
360 self.assertEqual(p['caIssuers'],
361 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
362 self.assertEqual(p['crlDistributionPoints'],
363 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000364
Christian Heimesa37f5242019-01-15 23:47:42 +0100365 def test_parse_cert_CVE_2019_5010(self):
366 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
367 if support.verbose:
368 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
369 self.assertEqual(
370 p,
371 {
372 'issuer': (
373 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
374 'notAfter': 'Jun 14 18:00:58 2028 GMT',
375 'notBefore': 'Jun 18 18:00:58 2018 GMT',
376 'serialNumber': '02',
377 'subject': ((('countryName', 'UK'),),
378 (('commonName',
379 'codenomicon-vm-2.test.lal.cisco.com'),)),
380 'subjectAltName': (
381 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
382 'version': 3
383 }
384 )
385
Christian Heimes824f7f32013-08-17 00:54:47 +0200386 def test_parse_cert_CVE_2013_4238(self):
387 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
388 if support.verbose:
389 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
390 subject = ((('countryName', 'US'),),
391 (('stateOrProvinceName', 'Oregon'),),
392 (('localityName', 'Beaverton'),),
393 (('organizationName', 'Python Software Foundation'),),
394 (('organizationalUnitName', 'Python Core Development'),),
395 (('commonName', 'null.python.org\x00example.org'),),
396 (('emailAddress', 'python-dev@python.org'),))
397 self.assertEqual(p['subject'], subject)
398 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200399 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
400 san = (('DNS', 'altnull.python.org\x00example.com'),
401 ('email', 'null@python.org\x00user@example.org'),
402 ('URI', 'http://null.python.org\x00http://example.org'),
403 ('IP Address', '192.0.2.1'),
404 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
405 else:
406 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
407 san = (('DNS', 'altnull.python.org\x00example.com'),
408 ('email', 'null@python.org\x00user@example.org'),
409 ('URI', 'http://null.python.org\x00http://example.org'),
410 ('IP Address', '192.0.2.1'),
411 ('IP Address', '<invalid>'))
412
413 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200414
Christian Heimes1c03abd2016-09-06 23:25:35 +0200415 def test_parse_all_sans(self):
416 p = ssl._ssl._test_decode_cert(ALLSANFILE)
417 self.assertEqual(p['subjectAltName'],
418 (
419 ('DNS', 'allsans'),
420 ('othername', '<unsupported>'),
421 ('othername', '<unsupported>'),
422 ('email', 'user@example.org'),
423 ('DNS', 'www.example.org'),
424 ('DirName',
425 ((('countryName', 'XY'),),
426 (('localityName', 'Castle Anthrax'),),
427 (('organizationName', 'Python Software Foundation'),),
428 (('commonName', 'dirname example'),))),
429 ('URI', 'https://www.python.org/'),
430 ('IP Address', '127.0.0.1'),
431 ('IP Address', '0:0:0:0:0:0:0:1\n'),
432 ('Registered ID', '1.2.3.4.5')
433 )
434 )
435
Antoine Pitrou480a1242010-04-28 21:37:09 +0000436 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000437 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000438 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000439 d1 = ssl.PEM_cert_to_DER_cert(pem)
440 p2 = ssl.DER_cert_to_PEM_cert(d1)
441 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000442 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000443 if not p2.startswith(ssl.PEM_HEADER + '\n'):
444 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
445 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
446 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000447
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000448 def test_openssl_version(self):
449 n = ssl.OPENSSL_VERSION_NUMBER
450 t = ssl.OPENSSL_VERSION_INFO
451 s = ssl.OPENSSL_VERSION
452 self.assertIsInstance(n, int)
453 self.assertIsInstance(t, tuple)
454 self.assertIsInstance(s, str)
455 # Some sanity checks follow
456 # >= 0.9
457 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400458 # < 3.0
459 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000460 major, minor, fix, patch, status = t
461 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400462 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000463 self.assertGreaterEqual(minor, 0)
464 self.assertLess(minor, 256)
465 self.assertGreaterEqual(fix, 0)
466 self.assertLess(fix, 256)
467 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100468 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000469 self.assertGreaterEqual(status, 0)
470 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400471 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200472 if IS_LIBRESSL:
473 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100474 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400475 else:
476 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100477 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000478
Antoine Pitrou9d543662010-04-23 23:10:32 +0000479 @support.cpython_only
480 def test_refcycle(self):
481 # Issue #7943: an SSL object doesn't create reference cycles with
482 # itself.
483 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200484 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000485 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100486 with support.check_warnings(("", ResourceWarning)):
487 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100488 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000489
Antoine Pitroua468adc2010-09-14 14:43:44 +0000490 def test_wrapped_unconnected(self):
491 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200492 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000493 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200494 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100495 self.assertRaises(OSError, ss.recv, 1)
496 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
497 self.assertRaises(OSError, ss.recvfrom, 1)
498 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
499 self.assertRaises(OSError, ss.send, b'x')
500 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200501 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100502 self.assertRaises(NotImplementedError, ss.sendmsg,
503 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200504 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
505 self.assertRaises(NotImplementedError, ss.recvmsg_into,
506 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000507
Antoine Pitrou40f08742010-04-24 22:04:40 +0000508 def test_timeout(self):
509 # Issue #8524: when creating an SSL socket, the timeout of the
510 # original socket should be retained.
511 for timeout in (None, 0.0, 5.0):
512 s = socket.socket(socket.AF_INET)
513 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200514 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100515 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000516
Christian Heimesd0486372016-09-10 23:23:33 +0200517 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000518 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000519 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000520 "certfile must be specified",
521 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000522 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000523 "certfile must be specified for server-side operations",
524 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000525 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000526 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200527 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100528 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
529 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200530 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200531 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000532 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000533 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000534 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200535 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000536 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000537 ssl.wrap_socket(sock,
538 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000539 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200540 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000541 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000542 ssl.wrap_socket(sock,
543 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000544 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000545
Martin Panter3464ea22016-02-01 21:58:11 +0000546 def bad_cert_test(self, certfile):
547 """Check that trying to use the given client certificate fails"""
548 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
549 certfile)
550 sock = socket.socket()
551 self.addCleanup(sock.close)
552 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200553 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200554 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000555
556 def test_empty_cert(self):
557 """Wrapping with an empty cert file"""
558 self.bad_cert_test("nullcert.pem")
559
560 def test_malformed_cert(self):
561 """Wrapping with a badly formatted certificate (syntax error)"""
562 self.bad_cert_test("badcert.pem")
563
564 def test_malformed_key(self):
565 """Wrapping with a badly formatted key (syntax error)"""
566 self.bad_cert_test("badkey.pem")
567
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000568 def test_match_hostname(self):
569 def ok(cert, hostname):
570 ssl.match_hostname(cert, hostname)
571 def fail(cert, hostname):
572 self.assertRaises(ssl.CertificateError,
573 ssl.match_hostname, cert, hostname)
574
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100575 # -- Hostname matching --
576
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000577 cert = {'subject': ((('commonName', 'example.com'),),)}
578 ok(cert, 'example.com')
579 ok(cert, 'ExAmple.cOm')
580 fail(cert, 'www.example.com')
581 fail(cert, '.example.com')
582 fail(cert, 'example.org')
583 fail(cert, 'exampleXcom')
584
585 cert = {'subject': ((('commonName', '*.a.com'),),)}
586 ok(cert, 'foo.a.com')
587 fail(cert, 'bar.foo.a.com')
588 fail(cert, 'a.com')
589 fail(cert, 'Xa.com')
590 fail(cert, '.a.com')
591
Mandeep Singhede2ac92017-11-27 04:01:27 +0530592 # only match wildcards when they are the only thing
593 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000594 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530595 fail(cert, 'foo.com')
596 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000597 fail(cert, 'bar.com')
598 fail(cert, 'foo.a.com')
599 fail(cert, 'bar.foo.com')
600
Christian Heimes824f7f32013-08-17 00:54:47 +0200601 # NULL bytes are bad, CVE-2013-4073
602 cert = {'subject': ((('commonName',
603 'null.python.org\x00example.org'),),)}
604 ok(cert, 'null.python.org\x00example.org') # or raise an error?
605 fail(cert, 'example.org')
606 fail(cert, 'null.python.org')
607
Georg Brandl72c98d32013-10-27 07:16:53 +0100608 # error cases with wildcards
609 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
610 fail(cert, 'bar.foo.a.com')
611 fail(cert, 'a.com')
612 fail(cert, 'Xa.com')
613 fail(cert, '.a.com')
614
615 cert = {'subject': ((('commonName', 'a.*.com'),),)}
616 fail(cert, 'a.foo.com')
617 fail(cert, 'a..com')
618 fail(cert, 'a.com')
619
620 # wildcard doesn't match IDNA prefix 'xn--'
621 idna = 'püthon.python.org'.encode("idna").decode("ascii")
622 cert = {'subject': ((('commonName', idna),),)}
623 ok(cert, idna)
624 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
625 fail(cert, idna)
626 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
627 fail(cert, idna)
628
629 # wildcard in first fragment and IDNA A-labels in sequent fragments
630 # are supported.
631 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
632 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530633 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
634 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100635 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
636 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
637
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000638 # Slightly fake real-world example
639 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
640 'subject': ((('commonName', 'linuxfrz.org'),),),
641 'subjectAltName': (('DNS', 'linuxfr.org'),
642 ('DNS', 'linuxfr.com'),
643 ('othername', '<unsupported>'))}
644 ok(cert, 'linuxfr.org')
645 ok(cert, 'linuxfr.com')
646 # Not a "DNS" entry
647 fail(cert, '<unsupported>')
648 # When there is a subjectAltName, commonName isn't used
649 fail(cert, 'linuxfrz.org')
650
651 # A pristine real-world example
652 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
653 'subject': ((('countryName', 'US'),),
654 (('stateOrProvinceName', 'California'),),
655 (('localityName', 'Mountain View'),),
656 (('organizationName', 'Google Inc'),),
657 (('commonName', 'mail.google.com'),))}
658 ok(cert, 'mail.google.com')
659 fail(cert, 'gmail.com')
660 # Only commonName is considered
661 fail(cert, 'California')
662
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100663 # -- IPv4 matching --
664 cert = {'subject': ((('commonName', 'example.com'),),),
665 'subjectAltName': (('DNS', 'example.com'),
666 ('IP Address', '10.11.12.13'),
667 ('IP Address', '14.15.16.17'))}
668 ok(cert, '10.11.12.13')
669 ok(cert, '14.15.16.17')
670 fail(cert, '14.15.16.18')
671 fail(cert, 'example.net')
672
673 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100674 if hasattr(socket, 'AF_INET6'):
675 cert = {'subject': ((('commonName', 'example.com'),),),
676 'subjectAltName': (
677 ('DNS', 'example.com'),
678 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
679 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
680 ok(cert, '2001::cafe')
681 ok(cert, '2003::baba')
682 fail(cert, '2003::bebe')
683 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100684
685 # -- Miscellaneous --
686
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000687 # Neither commonName nor subjectAltName
688 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
689 'subject': ((('countryName', 'US'),),
690 (('stateOrProvinceName', 'California'),),
691 (('localityName', 'Mountain View'),),
692 (('organizationName', 'Google Inc'),))}
693 fail(cert, 'mail.google.com')
694
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200695 # No DNS entry in subjectAltName but a commonName
696 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
697 'subject': ((('countryName', 'US'),),
698 (('stateOrProvinceName', 'California'),),
699 (('localityName', 'Mountain View'),),
700 (('commonName', 'mail.google.com'),)),
701 'subjectAltName': (('othername', 'blabla'), )}
702 ok(cert, 'mail.google.com')
703
704 # No DNS entry subjectAltName and no commonName
705 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
706 'subject': ((('countryName', 'US'),),
707 (('stateOrProvinceName', 'California'),),
708 (('localityName', 'Mountain View'),),
709 (('organizationName', 'Google Inc'),)),
710 'subjectAltName': (('othername', 'blabla'),)}
711 fail(cert, 'google.com')
712
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000713 # Empty cert / no cert
714 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
715 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
716
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200717 # Issue #17980: avoid denials of service by refusing more than one
718 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100719 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
720 with self.assertRaisesRegex(
721 ssl.CertificateError,
722 "partial wildcards in leftmost label are not supported"):
723 ssl.match_hostname(cert, 'axxb.example.com')
724
725 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
726 with self.assertRaisesRegex(
727 ssl.CertificateError,
728 "wildcard can only be present in the leftmost label"):
729 ssl.match_hostname(cert, 'www.sub.example.com')
730
731 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
732 with self.assertRaisesRegex(
733 ssl.CertificateError,
734 "too many wildcards"):
735 ssl.match_hostname(cert, 'axxbxxc.example.com')
736
737 cert = {'subject': ((('commonName', '*'),),)}
738 with self.assertRaisesRegex(
739 ssl.CertificateError,
740 "sole wildcard without additional labels are not support"):
741 ssl.match_hostname(cert, 'host')
742
743 cert = {'subject': ((('commonName', '*.com'),),)}
744 with self.assertRaisesRegex(
745 ssl.CertificateError,
746 r"hostname 'com' doesn't match '\*.com'"):
747 ssl.match_hostname(cert, 'com')
748
749 # extra checks for _inet_paton()
750 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
751 with self.assertRaises(ValueError):
752 ssl._inet_paton(invalid)
753 for ipaddr in ['127.0.0.1', '192.168.0.1']:
754 self.assertTrue(ssl._inet_paton(ipaddr))
755 if hasattr(socket, 'AF_INET6'):
756 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
757 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200758
Antoine Pitroud5323212010-10-22 18:19:07 +0000759 def test_server_side(self):
760 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200761 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000762 with socket.socket() as sock:
763 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
764 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000765
Antoine Pitroud6494802011-07-21 01:11:30 +0200766 def test_unknown_channel_binding(self):
767 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200768 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200769 c = socket.socket(socket.AF_INET)
770 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200771 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100772 with self.assertRaises(ValueError):
773 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200774 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200775
776 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
777 "'tls-unique' channel binding not available")
778 def test_tls_unique_channel_binding(self):
779 # unconnected should return None for known type
780 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200781 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100782 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200783 # the same for server-side
784 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200785 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100786 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200787
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600788 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200789 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600790 r = repr(ss)
791 with self.assertWarns(ResourceWarning) as cm:
792 ss = None
793 support.gc_collect()
794 self.assertIn(r, str(cm.warning.args[0]))
795
Christian Heimes6d7ad132013-06-09 18:02:55 +0200796 def test_get_default_verify_paths(self):
797 paths = ssl.get_default_verify_paths()
798 self.assertEqual(len(paths), 6)
799 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
800
801 with support.EnvironmentVarGuard() as env:
802 env["SSL_CERT_DIR"] = CAPATH
803 env["SSL_CERT_FILE"] = CERTFILE
804 paths = ssl.get_default_verify_paths()
805 self.assertEqual(paths.cafile, CERTFILE)
806 self.assertEqual(paths.capath, CAPATH)
807
Christian Heimes44109d72013-11-22 01:51:30 +0100808 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
809 def test_enum_certificates(self):
810 self.assertTrue(ssl.enum_certificates("CA"))
811 self.assertTrue(ssl.enum_certificates("ROOT"))
812
813 self.assertRaises(TypeError, ssl.enum_certificates)
814 self.assertRaises(WindowsError, ssl.enum_certificates, "")
815
Christian Heimesc2d65e12013-11-22 16:13:55 +0100816 trust_oids = set()
817 for storename in ("CA", "ROOT"):
818 store = ssl.enum_certificates(storename)
819 self.assertIsInstance(store, list)
820 for element in store:
821 self.assertIsInstance(element, tuple)
822 self.assertEqual(len(element), 3)
823 cert, enc, trust = element
824 self.assertIsInstance(cert, bytes)
825 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
826 self.assertIsInstance(trust, (set, bool))
827 if isinstance(trust, set):
828 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100829
830 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100831 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200832
Christian Heimes46bebee2013-06-09 19:03:31 +0200833 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100834 def test_enum_crls(self):
835 self.assertTrue(ssl.enum_crls("CA"))
836 self.assertRaises(TypeError, ssl.enum_crls)
837 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200838
Christian Heimes44109d72013-11-22 01:51:30 +0100839 crls = ssl.enum_crls("CA")
840 self.assertIsInstance(crls, list)
841 for element in crls:
842 self.assertIsInstance(element, tuple)
843 self.assertEqual(len(element), 2)
844 self.assertIsInstance(element[0], bytes)
845 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200846
Christian Heimes46bebee2013-06-09 19:03:31 +0200847
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100848 def test_asn1object(self):
849 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
850 '1.3.6.1.5.5.7.3.1')
851
852 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
853 self.assertEqual(val, expected)
854 self.assertEqual(val.nid, 129)
855 self.assertEqual(val.shortname, 'serverAuth')
856 self.assertEqual(val.longname, 'TLS Web Server Authentication')
857 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
858 self.assertIsInstance(val, ssl._ASN1Object)
859 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
860
861 val = ssl._ASN1Object.fromnid(129)
862 self.assertEqual(val, expected)
863 self.assertIsInstance(val, ssl._ASN1Object)
864 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100865 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
866 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100867 for i in range(1000):
868 try:
869 obj = ssl._ASN1Object.fromnid(i)
870 except ValueError:
871 pass
872 else:
873 self.assertIsInstance(obj.nid, int)
874 self.assertIsInstance(obj.shortname, str)
875 self.assertIsInstance(obj.longname, str)
876 self.assertIsInstance(obj.oid, (str, type(None)))
877
878 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
879 self.assertEqual(val, expected)
880 self.assertIsInstance(val, ssl._ASN1Object)
881 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
882 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
883 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100884 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
885 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100886
Christian Heimes72d28502013-11-23 13:56:58 +0100887 def test_purpose_enum(self):
888 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
889 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
890 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
891 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
892 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
893 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
894 '1.3.6.1.5.5.7.3.1')
895
896 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
897 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
898 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
899 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
900 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
901 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
902 '1.3.6.1.5.5.7.3.2')
903
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100904 def test_unsupported_dtls(self):
905 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
906 self.addCleanup(s.close)
907 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200908 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100909 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200910 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100911 with self.assertRaises(NotImplementedError) as cx:
912 ctx.wrap_socket(s)
913 self.assertEqual(str(cx.exception), "only stream sockets are supported")
914
Antoine Pitrouc695c952014-04-28 20:57:36 +0200915 def cert_time_ok(self, timestring, timestamp):
916 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
917
918 def cert_time_fail(self, timestring):
919 with self.assertRaises(ValueError):
920 ssl.cert_time_to_seconds(timestring)
921
922 @unittest.skipUnless(utc_offset(),
923 'local time needs to be different from UTC')
924 def test_cert_time_to_seconds_timezone(self):
925 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
926 # results if local timezone is not UTC
927 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
928 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
929
930 def test_cert_time_to_seconds(self):
931 timestring = "Jan 5 09:34:43 2018 GMT"
932 ts = 1515144883.0
933 self.cert_time_ok(timestring, ts)
934 # accept keyword parameter, assert its name
935 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
936 # accept both %e and %d (space or zero generated by strftime)
937 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
938 # case-insensitive
939 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
940 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
941 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
942 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
943 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
944 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
945 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
946 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
947
948 newyear_ts = 1230768000.0
949 # leap seconds
950 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
951 # same timestamp
952 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
953
954 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
955 # allow 60th second (even if it is not a leap second)
956 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
957 # allow 2nd leap second for compatibility with time.strptime()
958 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
959 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
960
Mike53f7a7c2017-12-14 14:04:53 +0300961 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200962 # 99991231235959Z (rfc 5280)
963 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
964
965 @support.run_with_locale('LC_ALL', '')
966 def test_cert_time_to_seconds_locale(self):
967 # `cert_time_to_seconds()` should be locale independent
968
969 def local_february_name():
970 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
971
972 if local_february_name().lower() == 'feb':
973 self.skipTest("locale-specific month name needs to be "
974 "different from C locale")
975
976 # locale-independent
977 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
978 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
979
Martin Panter3840b2a2016-03-27 01:53:46 +0000980 def test_connect_ex_error(self):
981 server = socket.socket(socket.AF_INET)
982 self.addCleanup(server.close)
983 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200984 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000985 cert_reqs=ssl.CERT_REQUIRED)
986 self.addCleanup(s.close)
987 rc = s.connect_ex((HOST, port))
988 # Issue #19919: Windows machines or VMs hosted on Windows
989 # machines sometimes return EWOULDBLOCK.
990 errors = (
991 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
992 errno.EWOULDBLOCK,
993 )
994 self.assertIn(rc, errors)
995
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100996
Antoine Pitrou152efa22010-05-16 18:19:27 +0000997class ContextTests(unittest.TestCase):
998
999 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001000 for protocol in PROTOCOLS:
1001 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001002 ctx = ssl.SSLContext()
1003 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001004 self.assertRaises(ValueError, ssl.SSLContext, -1)
1005 self.assertRaises(ValueError, ssl.SSLContext, 42)
1006
1007 def test_protocol(self):
1008 for proto in PROTOCOLS:
1009 ctx = ssl.SSLContext(proto)
1010 self.assertEqual(ctx.protocol, proto)
1011
1012 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001013 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001014 ctx.set_ciphers("ALL")
1015 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001016 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001017 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001018
Christian Heimes892d66e2018-01-29 14:10:18 +01001019 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1020 "Test applies only to Python default ciphers")
1021 def test_python_ciphers(self):
1022 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1023 ciphers = ctx.get_ciphers()
1024 for suite in ciphers:
1025 name = suite['name']
1026 self.assertNotIn("PSK", name)
1027 self.assertNotIn("SRP", name)
1028 self.assertNotIn("MD5", name)
1029 self.assertNotIn("RC4", name)
1030 self.assertNotIn("3DES", name)
1031
Christian Heimes25bfcd52016-09-06 00:04:45 +02001032 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1033 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001035 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001036 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001037 self.assertIn('AES256-GCM-SHA384', names)
1038 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001039
Antoine Pitroub5218772010-05-21 09:56:06 +00001040 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001041 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001042 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001043 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001044 # SSLContext also enables these by default
1045 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001046 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1047 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001048 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001049 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001050 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001051 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001052 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1053 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001054 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001055 # Ubuntu has OP_NO_SSLv3 forced on by default
1056 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001057 else:
1058 with self.assertRaises(ValueError):
1059 ctx.options = 0
1060
Christian Heimesa170fa12017-09-15 20:27:30 +02001061 def test_verify_mode_protocol(self):
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001063 # Default value
1064 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1065 ctx.verify_mode = ssl.CERT_OPTIONAL
1066 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1067 ctx.verify_mode = ssl.CERT_REQUIRED
1068 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1069 ctx.verify_mode = ssl.CERT_NONE
1070 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1071 with self.assertRaises(TypeError):
1072 ctx.verify_mode = None
1073 with self.assertRaises(ValueError):
1074 ctx.verify_mode = 42
1075
Christian Heimesa170fa12017-09-15 20:27:30 +02001076 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1077 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1078 self.assertFalse(ctx.check_hostname)
1079
1080 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1081 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1082 self.assertTrue(ctx.check_hostname)
1083
Christian Heimes61d478c2018-01-27 15:51:38 +01001084 def test_hostname_checks_common_name(self):
1085 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1086 self.assertTrue(ctx.hostname_checks_common_name)
1087 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1088 ctx.hostname_checks_common_name = True
1089 self.assertTrue(ctx.hostname_checks_common_name)
1090 ctx.hostname_checks_common_name = False
1091 self.assertFalse(ctx.hostname_checks_common_name)
1092 ctx.hostname_checks_common_name = True
1093 self.assertTrue(ctx.hostname_checks_common_name)
1094 else:
1095 with self.assertRaises(AttributeError):
1096 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001097
Christian Heimes698dde12018-02-27 11:54:43 +01001098 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1099 "required OpenSSL 1.1.0g")
1100 def test_min_max_version(self):
1101 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001102 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1103 # Fedora override the setting to TLS 1.0.
1104 self.assertIn(
1105 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001106 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1107 # Fedora 29 uses TLS 1.0 by default
1108 ssl.TLSVersion.TLSv1,
1109 # RHEL 8 uses TLS 1.2 by default
1110 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001111 )
1112 self.assertEqual(
1113 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1114 )
1115
1116 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1117 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1118 self.assertEqual(
1119 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1120 )
1121 self.assertEqual(
1122 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1123 )
1124
1125 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1126 ctx.maximum_version = ssl.TLSVersion.TLSv1
1127 self.assertEqual(
1128 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1129 )
1130 self.assertEqual(
1131 ctx.maximum_version, ssl.TLSVersion.TLSv1
1132 )
1133
1134 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1135 self.assertEqual(
1136 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 )
1138
1139 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1140 self.assertIn(
1141 ctx.maximum_version,
1142 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1143 )
1144
1145 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1146 self.assertIn(
1147 ctx.minimum_version,
1148 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1149 )
1150
1151 with self.assertRaises(ValueError):
1152 ctx.minimum_version = 42
1153
1154 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1155
1156 self.assertEqual(
1157 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1158 )
1159 self.assertEqual(
1160 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1161 )
1162 with self.assertRaises(ValueError):
1163 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1164 with self.assertRaises(ValueError):
1165 ctx.maximum_version = ssl.TLSVersion.TLSv1
1166
1167
Christian Heimes2427b502013-11-23 11:24:32 +01001168 @unittest.skipUnless(have_verify_flags(),
1169 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001170 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001171 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001172 # default value
1173 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1174 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001175 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1176 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1177 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1178 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1179 ctx.verify_flags = ssl.VERIFY_DEFAULT
1180 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1181 # supports any value
1182 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1183 self.assertEqual(ctx.verify_flags,
1184 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1185 with self.assertRaises(TypeError):
1186 ctx.verify_flags = None
1187
Antoine Pitrou152efa22010-05-16 18:19:27 +00001188 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001189 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001190 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001191 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001192 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1193 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001194 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001195 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001196 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001197 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001198 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001199 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001200 ctx.load_cert_chain(EMPTYCERT)
1201 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001202 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001203 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1204 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1205 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001206 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001207 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001208 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001209 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001210 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001211 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1212 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001213 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001214 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001215 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001216 # Password protected key and cert
1217 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1218 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1219 ctx.load_cert_chain(CERTFILE_PROTECTED,
1220 password=bytearray(KEY_PASSWORD.encode()))
1221 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1222 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1223 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1224 bytearray(KEY_PASSWORD.encode()))
1225 with self.assertRaisesRegex(TypeError, "should be a string"):
1226 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1227 with self.assertRaises(ssl.SSLError):
1228 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1229 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1230 # openssl has a fixed limit on the password buffer.
1231 # PEM_BUFSIZE is generally set to 1kb.
1232 # Return a string larger than this.
1233 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1234 # Password callback
1235 def getpass_unicode():
1236 return KEY_PASSWORD
1237 def getpass_bytes():
1238 return KEY_PASSWORD.encode()
1239 def getpass_bytearray():
1240 return bytearray(KEY_PASSWORD.encode())
1241 def getpass_badpass():
1242 return "badpass"
1243 def getpass_huge():
1244 return b'a' * (1024 * 1024)
1245 def getpass_bad_type():
1246 return 9
1247 def getpass_exception():
1248 raise Exception('getpass error')
1249 class GetPassCallable:
1250 def __call__(self):
1251 return KEY_PASSWORD
1252 def getpass(self):
1253 return KEY_PASSWORD
1254 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1255 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1256 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1257 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1258 ctx.load_cert_chain(CERTFILE_PROTECTED,
1259 password=GetPassCallable().getpass)
1260 with self.assertRaises(ssl.SSLError):
1261 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1262 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1263 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1264 with self.assertRaisesRegex(TypeError, "must return a string"):
1265 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1266 with self.assertRaisesRegex(Exception, "getpass error"):
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1268 # Make sure the password function isn't called if it isn't needed
1269 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001270
1271 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001272 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001273 ctx.load_verify_locations(CERTFILE)
1274 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1275 ctx.load_verify_locations(BYTES_CERTFILE)
1276 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1277 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001278 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001279 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001280 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001281 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001282 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001283 ctx.load_verify_locations(BADCERT)
1284 ctx.load_verify_locations(CERTFILE, CAPATH)
1285 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1286
Victor Stinner80f75e62011-01-29 11:31:20 +00001287 # Issue #10989: crash if the second argument type is invalid
1288 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1289
Christian Heimesefff7062013-11-21 03:35:02 +01001290 def test_load_verify_cadata(self):
1291 # test cadata
1292 with open(CAFILE_CACERT) as f:
1293 cacert_pem = f.read()
1294 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1295 with open(CAFILE_NEURONIO) as f:
1296 neuronio_pem = f.read()
1297 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1298
1299 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001300 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001301 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1302 ctx.load_verify_locations(cadata=cacert_pem)
1303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1304 ctx.load_verify_locations(cadata=neuronio_pem)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1306 # cert already in hash table
1307 ctx.load_verify_locations(cadata=neuronio_pem)
1308 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1309
1310 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001311 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001312 combined = "\n".join((cacert_pem, neuronio_pem))
1313 ctx.load_verify_locations(cadata=combined)
1314 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1315
1316 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001317 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001318 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1319 neuronio_pem, "tail"]
1320 ctx.load_verify_locations(cadata="\n".join(combined))
1321 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1322
1323 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001324 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001325 ctx.load_verify_locations(cadata=cacert_der)
1326 ctx.load_verify_locations(cadata=neuronio_der)
1327 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1328 # cert already in hash table
1329 ctx.load_verify_locations(cadata=cacert_der)
1330 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1331
1332 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001333 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001334 combined = b"".join((cacert_der, neuronio_der))
1335 ctx.load_verify_locations(cadata=combined)
1336 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1337
1338 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001339 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001340 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1341
1342 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1343 ctx.load_verify_locations(cadata="broken")
1344 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1345 ctx.load_verify_locations(cadata=b"broken")
1346
1347
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001348 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001349 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001350 ctx.load_dh_params(DHFILE)
1351 if os.name != 'nt':
1352 ctx.load_dh_params(BYTES_DHFILE)
1353 self.assertRaises(TypeError, ctx.load_dh_params)
1354 self.assertRaises(TypeError, ctx.load_dh_params, None)
1355 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001356 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001357 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001358 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001359 ctx.load_dh_params(CERTFILE)
1360
Antoine Pitroub0182c82010-10-12 20:09:02 +00001361 def test_session_stats(self):
1362 for proto in PROTOCOLS:
1363 ctx = ssl.SSLContext(proto)
1364 self.assertEqual(ctx.session_stats(), {
1365 'number': 0,
1366 'connect': 0,
1367 'connect_good': 0,
1368 'connect_renegotiate': 0,
1369 'accept': 0,
1370 'accept_good': 0,
1371 'accept_renegotiate': 0,
1372 'hits': 0,
1373 'misses': 0,
1374 'timeouts': 0,
1375 'cache_full': 0,
1376 })
1377
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001378 def test_set_default_verify_paths(self):
1379 # There's not much we can do to test that it acts as expected,
1380 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001381 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001382 ctx.set_default_verify_paths()
1383
Antoine Pitrou501da612011-12-21 09:27:41 +01001384 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001385 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001386 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001387 ctx.set_ecdh_curve("prime256v1")
1388 ctx.set_ecdh_curve(b"prime256v1")
1389 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1390 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1391 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1392 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1393
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001394 @needs_sni
1395 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001396 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001397
1398 # set_servername_callback expects a callable, or None
1399 self.assertRaises(TypeError, ctx.set_servername_callback)
1400 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1401 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1402 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1403
1404 def dummycallback(sock, servername, ctx):
1405 pass
1406 ctx.set_servername_callback(None)
1407 ctx.set_servername_callback(dummycallback)
1408
1409 @needs_sni
1410 def test_sni_callback_refcycle(self):
1411 # Reference cycles through the servername callback are detected
1412 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001413 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001414 def dummycallback(sock, servername, ctx, cycle=ctx):
1415 pass
1416 ctx.set_servername_callback(dummycallback)
1417 wr = weakref.ref(ctx)
1418 del ctx, dummycallback
1419 gc.collect()
1420 self.assertIs(wr(), None)
1421
Christian Heimes9a5395a2013-06-17 15:44:12 +02001422 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001423 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001424 self.assertEqual(ctx.cert_store_stats(),
1425 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1426 ctx.load_cert_chain(CERTFILE)
1427 self.assertEqual(ctx.cert_store_stats(),
1428 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1429 ctx.load_verify_locations(CERTFILE)
1430 self.assertEqual(ctx.cert_store_stats(),
1431 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001432 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001433 self.assertEqual(ctx.cert_store_stats(),
1434 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1435
1436 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001437 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001438 self.assertEqual(ctx.get_ca_certs(), [])
1439 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1440 ctx.load_verify_locations(CERTFILE)
1441 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001442 # but CAFILE_CACERT is a CA cert
1443 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001444 self.assertEqual(ctx.get_ca_certs(),
1445 [{'issuer': ((('organizationName', 'Root CA'),),
1446 (('organizationalUnitName', 'http://www.cacert.org'),),
1447 (('commonName', 'CA Cert Signing Authority'),),
1448 (('emailAddress', 'support@cacert.org'),)),
1449 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1450 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1451 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001452 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001453 'subject': ((('organizationName', 'Root CA'),),
1454 (('organizationalUnitName', 'http://www.cacert.org'),),
1455 (('commonName', 'CA Cert Signing Authority'),),
1456 (('emailAddress', 'support@cacert.org'),)),
1457 'version': 3}])
1458
Martin Panterb55f8b72016-01-14 12:53:56 +00001459 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001460 pem = f.read()
1461 der = ssl.PEM_cert_to_DER_cert(pem)
1462 self.assertEqual(ctx.get_ca_certs(True), [der])
1463
Christian Heimes72d28502013-11-23 13:56:58 +01001464 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001465 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001466 ctx.load_default_certs()
1467
Christian Heimesa170fa12017-09-15 20:27:30 +02001468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001469 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1470 ctx.load_default_certs()
1471
Christian Heimesa170fa12017-09-15 20:27:30 +02001472 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001473 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1474
Christian Heimesa170fa12017-09-15 20:27:30 +02001475 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001476 self.assertRaises(TypeError, ctx.load_default_certs, None)
1477 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1478
Benjamin Peterson91244e02014-10-03 18:17:15 -04001479 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001480 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001481 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001482 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001483 with support.EnvironmentVarGuard() as env:
1484 env["SSL_CERT_DIR"] = CAPATH
1485 env["SSL_CERT_FILE"] = CERTFILE
1486 ctx.load_default_certs()
1487 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1488
Benjamin Peterson91244e02014-10-03 18:17:15 -04001489 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001490 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001491 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001492 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001493 ctx.load_default_certs()
1494 stats = ctx.cert_store_stats()
1495
Christian Heimesa170fa12017-09-15 20:27:30 +02001496 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001497 with support.EnvironmentVarGuard() as env:
1498 env["SSL_CERT_DIR"] = CAPATH
1499 env["SSL_CERT_FILE"] = CERTFILE
1500 ctx.load_default_certs()
1501 stats["x509"] += 1
1502 self.assertEqual(ctx.cert_store_stats(), stats)
1503
Christian Heimes358cfd42016-09-10 22:43:48 +02001504 def _assert_context_options(self, ctx):
1505 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1506 if OP_NO_COMPRESSION != 0:
1507 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1508 OP_NO_COMPRESSION)
1509 if OP_SINGLE_DH_USE != 0:
1510 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1511 OP_SINGLE_DH_USE)
1512 if OP_SINGLE_ECDH_USE != 0:
1513 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1514 OP_SINGLE_ECDH_USE)
1515 if OP_CIPHER_SERVER_PREFERENCE != 0:
1516 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1517 OP_CIPHER_SERVER_PREFERENCE)
1518
Christian Heimes4c05b472013-11-23 15:58:30 +01001519 def test_create_default_context(self):
1520 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001521
Christian Heimesa170fa12017-09-15 20:27:30 +02001522 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001523 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001524 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001525 self._assert_context_options(ctx)
1526
Christian Heimes4c05b472013-11-23 15:58:30 +01001527 with open(SIGNING_CA) as f:
1528 cadata = f.read()
1529 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1530 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001531 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001532 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001533 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001534
1535 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001536 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001537 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001538 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001539
Christian Heimes67986f92013-11-23 22:43:47 +01001540 def test__create_stdlib_context(self):
1541 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001543 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001544 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001545 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001546
1547 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1548 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1549 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001550 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001551
1552 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001553 cert_reqs=ssl.CERT_REQUIRED,
1554 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001555 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1556 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001557 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001558 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001559
1560 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001561 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001562 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001563 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001564
Christian Heimes1aa9a752013-12-02 02:41:19 +01001565 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001566 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001567 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001568 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001569
Christian Heimese82c0342017-09-15 20:29:57 +02001570 # Auto set CERT_REQUIRED
1571 ctx.check_hostname = True
1572 self.assertTrue(ctx.check_hostname)
1573 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1574 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575 ctx.verify_mode = ssl.CERT_REQUIRED
1576 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001577 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001578
Christian Heimese82c0342017-09-15 20:29:57 +02001579 # Changing verify_mode does not affect check_hostname
1580 ctx.check_hostname = False
1581 ctx.verify_mode = ssl.CERT_NONE
1582 ctx.check_hostname = False
1583 self.assertFalse(ctx.check_hostname)
1584 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1585 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001586 ctx.check_hostname = True
1587 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001588 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1589
1590 ctx.check_hostname = False
1591 ctx.verify_mode = ssl.CERT_OPTIONAL
1592 ctx.check_hostname = False
1593 self.assertFalse(ctx.check_hostname)
1594 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1595 # keep CERT_OPTIONAL
1596 ctx.check_hostname = True
1597 self.assertTrue(ctx.check_hostname)
1598 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001599
1600 # Cannot set CERT_NONE with check_hostname enabled
1601 with self.assertRaises(ValueError):
1602 ctx.verify_mode = ssl.CERT_NONE
1603 ctx.check_hostname = False
1604 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001605 ctx.verify_mode = ssl.CERT_NONE
1606 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001607
Christian Heimes5fe668c2016-09-12 00:01:11 +02001608 def test_context_client_server(self):
1609 # PROTOCOL_TLS_CLIENT has sane defaults
1610 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1611 self.assertTrue(ctx.check_hostname)
1612 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1613
1614 # PROTOCOL_TLS_SERVER has different but also sane defaults
1615 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1616 self.assertFalse(ctx.check_hostname)
1617 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1618
Christian Heimes4df60f12017-09-15 20:26:05 +02001619 def test_context_custom_class(self):
1620 class MySSLSocket(ssl.SSLSocket):
1621 pass
1622
1623 class MySSLObject(ssl.SSLObject):
1624 pass
1625
1626 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1627 ctx.sslsocket_class = MySSLSocket
1628 ctx.sslobject_class = MySSLObject
1629
1630 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1631 self.assertIsInstance(sock, MySSLSocket)
1632 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1633 self.assertIsInstance(obj, MySSLObject)
1634
Antoine Pitrou152efa22010-05-16 18:19:27 +00001635
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001636class SSLErrorTests(unittest.TestCase):
1637
1638 def test_str(self):
1639 # The str() of a SSLError doesn't include the errno
1640 e = ssl.SSLError(1, "foo")
1641 self.assertEqual(str(e), "foo")
1642 self.assertEqual(e.errno, 1)
1643 # Same for a subclass
1644 e = ssl.SSLZeroReturnError(1, "foo")
1645 self.assertEqual(str(e), "foo")
1646 self.assertEqual(e.errno, 1)
1647
1648 def test_lib_reason(self):
1649 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001650 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001651 with self.assertRaises(ssl.SSLError) as cm:
1652 ctx.load_dh_params(CERTFILE)
1653 self.assertEqual(cm.exception.library, 'PEM')
1654 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1655 s = str(cm.exception)
1656 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1657
1658 def test_subclass(self):
1659 # Check that the appropriate SSLError subclass is raised
1660 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001661 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1662 ctx.check_hostname = False
1663 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001664 with socket.create_server(("127.0.0.1", 0)) as s:
1665 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001666 c.setblocking(False)
1667 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001668 with self.assertRaises(ssl.SSLWantReadError) as cm:
1669 c.do_handshake()
1670 s = str(cm.exception)
1671 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1672 # For compatibility
1673 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1674
1675
Christian Heimes61d478c2018-01-27 15:51:38 +01001676 def test_bad_server_hostname(self):
1677 ctx = ssl.create_default_context()
1678 with self.assertRaises(ValueError):
1679 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1680 server_hostname="")
1681 with self.assertRaises(ValueError):
1682 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1683 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001684 with self.assertRaises(TypeError):
1685 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1686 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001687
1688
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001689class MemoryBIOTests(unittest.TestCase):
1690
1691 def test_read_write(self):
1692 bio = ssl.MemoryBIO()
1693 bio.write(b'foo')
1694 self.assertEqual(bio.read(), b'foo')
1695 self.assertEqual(bio.read(), b'')
1696 bio.write(b'foo')
1697 bio.write(b'bar')
1698 self.assertEqual(bio.read(), b'foobar')
1699 self.assertEqual(bio.read(), b'')
1700 bio.write(b'baz')
1701 self.assertEqual(bio.read(2), b'ba')
1702 self.assertEqual(bio.read(1), b'z')
1703 self.assertEqual(bio.read(1), b'')
1704
1705 def test_eof(self):
1706 bio = ssl.MemoryBIO()
1707 self.assertFalse(bio.eof)
1708 self.assertEqual(bio.read(), b'')
1709 self.assertFalse(bio.eof)
1710 bio.write(b'foo')
1711 self.assertFalse(bio.eof)
1712 bio.write_eof()
1713 self.assertFalse(bio.eof)
1714 self.assertEqual(bio.read(2), b'fo')
1715 self.assertFalse(bio.eof)
1716 self.assertEqual(bio.read(1), b'o')
1717 self.assertTrue(bio.eof)
1718 self.assertEqual(bio.read(), b'')
1719 self.assertTrue(bio.eof)
1720
1721 def test_pending(self):
1722 bio = ssl.MemoryBIO()
1723 self.assertEqual(bio.pending, 0)
1724 bio.write(b'foo')
1725 self.assertEqual(bio.pending, 3)
1726 for i in range(3):
1727 bio.read(1)
1728 self.assertEqual(bio.pending, 3-i-1)
1729 for i in range(3):
1730 bio.write(b'x')
1731 self.assertEqual(bio.pending, i+1)
1732 bio.read()
1733 self.assertEqual(bio.pending, 0)
1734
1735 def test_buffer_types(self):
1736 bio = ssl.MemoryBIO()
1737 bio.write(b'foo')
1738 self.assertEqual(bio.read(), b'foo')
1739 bio.write(bytearray(b'bar'))
1740 self.assertEqual(bio.read(), b'bar')
1741 bio.write(memoryview(b'baz'))
1742 self.assertEqual(bio.read(), b'baz')
1743
1744 def test_error_types(self):
1745 bio = ssl.MemoryBIO()
1746 self.assertRaises(TypeError, bio.write, 'foo')
1747 self.assertRaises(TypeError, bio.write, None)
1748 self.assertRaises(TypeError, bio.write, True)
1749 self.assertRaises(TypeError, bio.write, 1)
1750
1751
Christian Heimes9d50ab52018-02-27 10:17:30 +01001752class SSLObjectTests(unittest.TestCase):
1753 def test_private_init(self):
1754 bio = ssl.MemoryBIO()
1755 with self.assertRaisesRegex(TypeError, "public constructor"):
1756 ssl.SSLObject(bio, bio)
1757
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001758 def test_unwrap(self):
1759 client_ctx, server_ctx, hostname = testing_context()
1760 c_in = ssl.MemoryBIO()
1761 c_out = ssl.MemoryBIO()
1762 s_in = ssl.MemoryBIO()
1763 s_out = ssl.MemoryBIO()
1764 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1765 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1766
1767 # Loop on the handshake for a bit to get it settled
1768 for _ in range(5):
1769 try:
1770 client.do_handshake()
1771 except ssl.SSLWantReadError:
1772 pass
1773 if c_out.pending:
1774 s_in.write(c_out.read())
1775 try:
1776 server.do_handshake()
1777 except ssl.SSLWantReadError:
1778 pass
1779 if s_out.pending:
1780 c_in.write(s_out.read())
1781 # Now the handshakes should be complete (don't raise WantReadError)
1782 client.do_handshake()
1783 server.do_handshake()
1784
1785 # Now if we unwrap one side unilaterally, it should send close-notify
1786 # and raise WantReadError:
1787 with self.assertRaises(ssl.SSLWantReadError):
1788 client.unwrap()
1789
1790 # But server.unwrap() does not raise, because it reads the client's
1791 # close-notify:
1792 s_in.write(c_out.read())
1793 server.unwrap()
1794
1795 # And now that the client gets the server's close-notify, it doesn't
1796 # raise either.
1797 c_in.write(s_out.read())
1798 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001799
Martin Panter3840b2a2016-03-27 01:53:46 +00001800class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001801 """Tests that connect to a simple server running in the background"""
1802
1803 def setUp(self):
1804 server = ThreadedEchoServer(SIGNED_CERTFILE)
1805 self.server_addr = (HOST, server.port)
1806 server.__enter__()
1807 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001808
Antoine Pitrou480a1242010-04-28 21:37:09 +00001809 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001810 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001811 cert_reqs=ssl.CERT_NONE) as s:
1812 s.connect(self.server_addr)
1813 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001814 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001815
Martin Panter3840b2a2016-03-27 01:53:46 +00001816 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001817 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001818 cert_reqs=ssl.CERT_REQUIRED,
1819 ca_certs=SIGNING_CA) as s:
1820 s.connect(self.server_addr)
1821 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001822 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001823
Martin Panter3840b2a2016-03-27 01:53:46 +00001824 def test_connect_fail(self):
1825 # This should fail because we have no verification certs. Connection
1826 # failure crashes ThreadedEchoServer, so run this in an independent
1827 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001828 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 cert_reqs=ssl.CERT_REQUIRED)
1830 self.addCleanup(s.close)
1831 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1832 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001833
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001834 def test_connect_ex(self):
1835 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001836 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001837 cert_reqs=ssl.CERT_REQUIRED,
1838 ca_certs=SIGNING_CA)
1839 self.addCleanup(s.close)
1840 self.assertEqual(0, s.connect_ex(self.server_addr))
1841 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001842
1843 def test_non_blocking_connect_ex(self):
1844 # Issue #11326: non-blocking connect_ex() should allow handshake
1845 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001846 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001847 cert_reqs=ssl.CERT_REQUIRED,
1848 ca_certs=SIGNING_CA,
1849 do_handshake_on_connect=False)
1850 self.addCleanup(s.close)
1851 s.setblocking(False)
1852 rc = s.connect_ex(self.server_addr)
1853 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1854 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1855 # Wait for connect to finish
1856 select.select([], [s], [], 5.0)
1857 # Non-blocking handshake
1858 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001859 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001860 s.do_handshake()
1861 break
1862 except ssl.SSLWantReadError:
1863 select.select([s], [], [], 5.0)
1864 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001865 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001866 # SSL established
1867 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001868
Antoine Pitrou152efa22010-05-16 18:19:27 +00001869 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001870 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001871 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1873 s.connect(self.server_addr)
1874 self.assertEqual({}, s.getpeercert())
1875 # Same with a server hostname
1876 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1877 server_hostname="dummy") as s:
1878 s.connect(self.server_addr)
1879 ctx.verify_mode = ssl.CERT_REQUIRED
1880 # This should succeed because we specify the root cert
1881 ctx.load_verify_locations(SIGNING_CA)
1882 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1883 s.connect(self.server_addr)
1884 cert = s.getpeercert()
1885 self.assertTrue(cert)
1886
1887 def test_connect_with_context_fail(self):
1888 # This should fail because we have no verification certs. Connection
1889 # failure crashes ThreadedEchoServer, so run this in an independent
1890 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001891 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001892 ctx.verify_mode = ssl.CERT_REQUIRED
1893 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1894 self.addCleanup(s.close)
1895 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1896 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001897
1898 def test_connect_capath(self):
1899 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001900 # NOTE: the subject hashing algorithm has been changed between
1901 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1902 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001903 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001904 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001905 ctx.verify_mode = ssl.CERT_REQUIRED
1906 ctx.load_verify_locations(capath=CAPATH)
1907 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1908 s.connect(self.server_addr)
1909 cert = s.getpeercert()
1910 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001911
Martin Panter3840b2a2016-03-27 01:53:46 +00001912 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001913 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001914 ctx.verify_mode = ssl.CERT_REQUIRED
1915 ctx.load_verify_locations(capath=BYTES_CAPATH)
1916 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1917 s.connect(self.server_addr)
1918 cert = s.getpeercert()
1919 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001920
Christian Heimesefff7062013-11-21 03:35:02 +01001921 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001922 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001923 pem = f.read()
1924 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001925 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001926 ctx.verify_mode = ssl.CERT_REQUIRED
1927 ctx.load_verify_locations(cadata=pem)
1928 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1929 s.connect(self.server_addr)
1930 cert = s.getpeercert()
1931 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001932
Martin Panter3840b2a2016-03-27 01:53:46 +00001933 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001934 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001935 ctx.verify_mode = ssl.CERT_REQUIRED
1936 ctx.load_verify_locations(cadata=der)
1937 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1938 s.connect(self.server_addr)
1939 cert = s.getpeercert()
1940 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001941
Antoine Pitroue3220242010-04-24 11:13:53 +00001942 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1943 def test_makefile_close(self):
1944 # Issue #5238: creating a file-like object with makefile() shouldn't
1945 # delay closing the underlying "real socket" (here tested with its
1946 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001947 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001948 ss.connect(self.server_addr)
1949 fd = ss.fileno()
1950 f = ss.makefile()
1951 f.close()
1952 # The fd is still open
1953 os.read(fd, 0)
1954 # Closing the SSL socket should close the fd too
1955 ss.close()
1956 gc.collect()
1957 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001958 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001959 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001960
Antoine Pitrou480a1242010-04-28 21:37:09 +00001961 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001962 s = socket.socket(socket.AF_INET)
1963 s.connect(self.server_addr)
1964 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001965 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001966 cert_reqs=ssl.CERT_NONE,
1967 do_handshake_on_connect=False)
1968 self.addCleanup(s.close)
1969 count = 0
1970 while True:
1971 try:
1972 count += 1
1973 s.do_handshake()
1974 break
1975 except ssl.SSLWantReadError:
1976 select.select([s], [], [])
1977 except ssl.SSLWantWriteError:
1978 select.select([], [s], [])
1979 if support.verbose:
1980 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001981
Antoine Pitrou480a1242010-04-28 21:37:09 +00001982 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001983 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001984
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 def test_get_server_certificate_fail(self):
1986 # Connection failure crashes ThreadedEchoServer, so run this in an
1987 # independent test method
1988 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001989
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001990 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001991 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001992 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1993 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001994 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001995 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1996 s.connect(self.server_addr)
1997 # Error checking can happen at instantiation or when connecting
1998 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1999 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002000 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002001 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2002 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002003
Christian Heimes9a5395a2013-06-17 15:44:12 +02002004 def test_get_ca_certs_capath(self):
2005 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002006 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002007 ctx.load_verify_locations(capath=CAPATH)
2008 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002009 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2010 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002011 s.connect(self.server_addr)
2012 cert = s.getpeercert()
2013 self.assertTrue(cert)
2014 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002015
Christian Heimes575596e2013-12-15 21:49:17 +01002016 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002017 def test_context_setget(self):
2018 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002019 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2020 ctx1.load_verify_locations(capath=CAPATH)
2021 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2022 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002023 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002024 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002025 ss.connect(self.server_addr)
2026 self.assertIs(ss.context, ctx1)
2027 self.assertIs(ss._sslobj.context, ctx1)
2028 ss.context = ctx2
2029 self.assertIs(ss.context, ctx2)
2030 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002031
2032 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2033 # A simple IO loop. Call func(*args) depending on the error we get
2034 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2035 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002036 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002037 count = 0
2038 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002039 if time.monotonic() > deadline:
2040 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002041 errno = None
2042 count += 1
2043 try:
2044 ret = func(*args)
2045 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002046 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002047 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002048 raise
2049 errno = e.errno
2050 # Get any data from the outgoing BIO irrespective of any error, and
2051 # send it to the socket.
2052 buf = outgoing.read()
2053 sock.sendall(buf)
2054 # If there's no error, we're done. For WANT_READ, we need to get
2055 # data from the socket and put it in the incoming BIO.
2056 if errno is None:
2057 break
2058 elif errno == ssl.SSL_ERROR_WANT_READ:
2059 buf = sock.recv(32768)
2060 if buf:
2061 incoming.write(buf)
2062 else:
2063 incoming.write_eof()
2064 if support.verbose:
2065 sys.stdout.write("Needed %d calls to complete %s().\n"
2066 % (count, func.__name__))
2067 return ret
2068
Martin Panter3840b2a2016-03-27 01:53:46 +00002069 def test_bio_handshake(self):
2070 sock = socket.socket(socket.AF_INET)
2071 self.addCleanup(sock.close)
2072 sock.connect(self.server_addr)
2073 incoming = ssl.MemoryBIO()
2074 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002075 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2076 self.assertTrue(ctx.check_hostname)
2077 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002078 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002079 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2080 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002081 self.assertIs(sslobj._sslobj.owner, sslobj)
2082 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002083 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002084 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002085 self.assertRaises(ValueError, sslobj.getpeercert)
2086 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2087 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2088 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2089 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002090 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002091 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002092 self.assertTrue(sslobj.getpeercert())
2093 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2094 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2095 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002096 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002097 except ssl.SSLSyscallError:
2098 # If the server shuts down the TCP connection without sending a
2099 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2100 pass
2101 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2102
2103 def test_bio_read_write_data(self):
2104 sock = socket.socket(socket.AF_INET)
2105 self.addCleanup(sock.close)
2106 sock.connect(self.server_addr)
2107 incoming = ssl.MemoryBIO()
2108 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002109 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002110 ctx.verify_mode = ssl.CERT_NONE
2111 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2112 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2113 req = b'FOO\n'
2114 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2115 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2116 self.assertEqual(buf, b'foo\n')
2117 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002118
2119
Martin Panter3840b2a2016-03-27 01:53:46 +00002120class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002121
Martin Panter3840b2a2016-03-27 01:53:46 +00002122 def test_timeout_connect_ex(self):
2123 # Issue #12065: on a timeout, connect_ex() should return the original
2124 # errno (mimicking the behaviour of non-SSL sockets).
2125 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002126 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002127 cert_reqs=ssl.CERT_REQUIRED,
2128 do_handshake_on_connect=False)
2129 self.addCleanup(s.close)
2130 s.settimeout(0.0000001)
2131 rc = s.connect_ex((REMOTE_HOST, 443))
2132 if rc == 0:
2133 self.skipTest("REMOTE_HOST responded too quickly")
2134 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2135
2136 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2137 def test_get_server_certificate_ipv6(self):
2138 with support.transient_internet('ipv6.google.com'):
2139 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2140 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2141
Martin Panter3840b2a2016-03-27 01:53:46 +00002142
2143def _test_get_server_certificate(test, host, port, cert=None):
2144 pem = ssl.get_server_certificate((host, port))
2145 if not pem:
2146 test.fail("No server certificate on %s:%s!" % (host, port))
2147
2148 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2149 if not pem:
2150 test.fail("No server certificate on %s:%s!" % (host, port))
2151 if support.verbose:
2152 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2153
2154def _test_get_server_certificate_fail(test, host, port):
2155 try:
2156 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2157 except ssl.SSLError as x:
2158 #should fail
2159 if support.verbose:
2160 sys.stdout.write("%s\n" % x)
2161 else:
2162 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2163
2164
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002165from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002166
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002167class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002169 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002170
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002171 """A mildly complicated class, because we want it to work both
2172 with and without the SSL wrapper around the socket connection, so
2173 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002174
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002175 def __init__(self, server, connsock, addr):
2176 self.server = server
2177 self.running = False
2178 self.sock = connsock
2179 self.addr = addr
2180 self.sock.setblocking(1)
2181 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002182 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002183 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002184
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002185 def wrap_conn(self):
2186 try:
2187 self.sslconn = self.server.context.wrap_socket(
2188 self.sock, server_side=True)
2189 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2190 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002191 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002192 # We treat ConnectionResetError as though it were an
2193 # SSLError - OpenSSL on Ubuntu abruptly closes the
2194 # connection when asked to use an unsupported protocol.
2195 #
Christian Heimes529525f2018-05-23 22:24:45 +02002196 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2197 # tries to send session tickets after handshake.
2198 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002199 #
2200 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2201 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002202 self.server.conn_errors.append(str(e))
2203 if self.server.chatty:
2204 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2205 self.running = False
2206 self.close()
2207 return False
2208 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002209 # OSError may occur with wrong protocols, e.g. both
2210 # sides use PROTOCOL_TLS_SERVER.
2211 #
2212 # XXX Various errors can have happened here, for example
2213 # a mismatching protocol version, an invalid certificate,
2214 # or a low-level bug. This should be made more discriminating.
2215 #
2216 # bpo-31323: Store the exception as string to prevent
2217 # a reference leak: server -> conn_errors -> exception
2218 # -> traceback -> self (ConnectionHandler) -> server
2219 self.server.conn_errors.append(str(e))
2220 if self.server.chatty:
2221 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2222 self.running = False
2223 self.server.stop()
2224 self.close()
2225 return False
2226 else:
2227 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2228 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2229 cert = self.sslconn.getpeercert()
2230 if support.verbose and self.server.chatty:
2231 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2232 cert_binary = self.sslconn.getpeercert(True)
2233 if support.verbose and self.server.chatty:
2234 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2235 cipher = self.sslconn.cipher()
2236 if support.verbose and self.server.chatty:
2237 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2238 sys.stdout.write(" server: selected protocol is now "
2239 + str(self.sslconn.selected_npn_protocol()) + "\n")
2240 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002241
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002242 def read(self):
2243 if self.sslconn:
2244 return self.sslconn.read()
2245 else:
2246 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002247
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002248 def write(self, bytes):
2249 if self.sslconn:
2250 return self.sslconn.write(bytes)
2251 else:
2252 return self.sock.send(bytes)
2253
2254 def close(self):
2255 if self.sslconn:
2256 self.sslconn.close()
2257 else:
2258 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002259
Antoine Pitrou480a1242010-04-28 21:37:09 +00002260 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002261 self.running = True
2262 if not self.server.starttls_server:
2263 if not self.wrap_conn():
2264 return
2265 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002266 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002267 msg = self.read()
2268 stripped = msg.strip()
2269 if not stripped:
2270 # eof, so quit this handler
2271 self.running = False
2272 try:
2273 self.sock = self.sslconn.unwrap()
2274 except OSError:
2275 # Many tests shut the TCP connection down
2276 # without an SSL shutdown. This causes
2277 # unwrap() to raise OSError with errno=0!
2278 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002279 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002280 self.sslconn = None
2281 self.close()
2282 elif stripped == b'over':
2283 if support.verbose and self.server.connectionchatty:
2284 sys.stdout.write(" server: client closed connection\n")
2285 self.close()
2286 return
2287 elif (self.server.starttls_server and
2288 stripped == b'STARTTLS'):
2289 if support.verbose and self.server.connectionchatty:
2290 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2291 self.write(b"OK\n")
2292 if not self.wrap_conn():
2293 return
2294 elif (self.server.starttls_server and self.sslconn
2295 and stripped == b'ENDTLS'):
2296 if support.verbose and self.server.connectionchatty:
2297 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2298 self.write(b"OK\n")
2299 self.sock = self.sslconn.unwrap()
2300 self.sslconn = None
2301 if support.verbose and self.server.connectionchatty:
2302 sys.stdout.write(" server: connection is now unencrypted...\n")
2303 elif stripped == b'CB tls-unique':
2304 if support.verbose and self.server.connectionchatty:
2305 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2306 data = self.sslconn.get_channel_binding("tls-unique")
2307 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002308 elif stripped == b'PHA':
2309 if support.verbose and self.server.connectionchatty:
2310 sys.stdout.write(" server: initiating post handshake auth\n")
2311 try:
2312 self.sslconn.verify_client_post_handshake()
2313 except ssl.SSLError as e:
2314 self.write(repr(e).encode("us-ascii") + b"\n")
2315 else:
2316 self.write(b"OK\n")
2317 elif stripped == b'HASCERT':
2318 if self.sslconn.getpeercert() is not None:
2319 self.write(b'TRUE\n')
2320 else:
2321 self.write(b'FALSE\n')
2322 elif stripped == b'GETCERT':
2323 cert = self.sslconn.getpeercert()
2324 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002325 else:
2326 if (support.verbose and
2327 self.server.connectionchatty):
2328 ctype = (self.sslconn and "encrypted") or "unencrypted"
2329 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2330 % (msg, ctype, msg.lower(), ctype))
2331 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002332 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002333 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2334 # when connection is not shut down gracefully.
2335 if self.server.chatty and support.verbose:
2336 sys.stdout.write(
2337 " Connection reset by peer: {}\n".format(
2338 self.addr)
2339 )
2340 self.close()
2341 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002342 except ssl.SSLError as err:
2343 # On Windows sometimes test_pha_required_nocert receives the
2344 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2345 # before the 'tlsv13 alert certificate required' exception.
2346 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2347 # is received test_pha_required_nocert fails with ConnectionResetError
2348 # because the underlying socket is closed
2349 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2350 if self.server.chatty and support.verbose:
2351 sys.stdout.write(err.args[1])
2352 # test_pha_required_nocert is expecting this exception
2353 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002354 except OSError:
2355 if self.server.chatty:
2356 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002357 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002358 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002359
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002360 # normally, we'd just stop here, but for the test
2361 # harness, we want to stop the server
2362 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002363
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002364 def __init__(self, certificate=None, ssl_version=None,
2365 certreqs=None, cacerts=None,
2366 chatty=True, connectionchatty=False, starttls_server=False,
2367 npn_protocols=None, alpn_protocols=None,
2368 ciphers=None, context=None):
2369 if context:
2370 self.context = context
2371 else:
2372 self.context = ssl.SSLContext(ssl_version
2373 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002374 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002375 self.context.verify_mode = (certreqs if certreqs is not None
2376 else ssl.CERT_NONE)
2377 if cacerts:
2378 self.context.load_verify_locations(cacerts)
2379 if certificate:
2380 self.context.load_cert_chain(certificate)
2381 if npn_protocols:
2382 self.context.set_npn_protocols(npn_protocols)
2383 if alpn_protocols:
2384 self.context.set_alpn_protocols(alpn_protocols)
2385 if ciphers:
2386 self.context.set_ciphers(ciphers)
2387 self.chatty = chatty
2388 self.connectionchatty = connectionchatty
2389 self.starttls_server = starttls_server
2390 self.sock = socket.socket()
2391 self.port = support.bind_port(self.sock)
2392 self.flag = None
2393 self.active = False
2394 self.selected_npn_protocols = []
2395 self.selected_alpn_protocols = []
2396 self.shared_ciphers = []
2397 self.conn_errors = []
2398 threading.Thread.__init__(self)
2399 self.daemon = True
2400
2401 def __enter__(self):
2402 self.start(threading.Event())
2403 self.flag.wait()
2404 return self
2405
2406 def __exit__(self, *args):
2407 self.stop()
2408 self.join()
2409
2410 def start(self, flag=None):
2411 self.flag = flag
2412 threading.Thread.start(self)
2413
2414 def run(self):
2415 self.sock.settimeout(0.05)
2416 self.sock.listen()
2417 self.active = True
2418 if self.flag:
2419 # signal an event
2420 self.flag.set()
2421 while self.active:
2422 try:
2423 newconn, connaddr = self.sock.accept()
2424 if support.verbose and self.chatty:
2425 sys.stdout.write(' server: new connection from '
2426 + repr(connaddr) + '\n')
2427 handler = self.ConnectionHandler(self, newconn, connaddr)
2428 handler.start()
2429 handler.join()
2430 except socket.timeout:
2431 pass
2432 except KeyboardInterrupt:
2433 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002434 except BaseException as e:
2435 if support.verbose and self.chatty:
2436 sys.stdout.write(
2437 ' connection handling failed: ' + repr(e) + '\n')
2438
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002439 self.sock.close()
2440
2441 def stop(self):
2442 self.active = False
2443
2444class AsyncoreEchoServer(threading.Thread):
2445
2446 # this one's based on asyncore.dispatcher
2447
2448 class EchoServer (asyncore.dispatcher):
2449
2450 class ConnectionHandler(asyncore.dispatcher_with_send):
2451
2452 def __init__(self, conn, certfile):
2453 self.socket = test_wrap_socket(conn, server_side=True,
2454 certfile=certfile,
2455 do_handshake_on_connect=False)
2456 asyncore.dispatcher_with_send.__init__(self, self.socket)
2457 self._ssl_accepting = True
2458 self._do_ssl_handshake()
2459
2460 def readable(self):
2461 if isinstance(self.socket, ssl.SSLSocket):
2462 while self.socket.pending() > 0:
2463 self.handle_read_event()
2464 return True
2465
2466 def _do_ssl_handshake(self):
2467 try:
2468 self.socket.do_handshake()
2469 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2470 return
2471 except ssl.SSLEOFError:
2472 return self.handle_close()
2473 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002474 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 except OSError as err:
2476 if err.args[0] == errno.ECONNABORTED:
2477 return self.handle_close()
2478 else:
2479 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002480
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002481 def handle_read(self):
2482 if self._ssl_accepting:
2483 self._do_ssl_handshake()
2484 else:
2485 data = self.recv(1024)
2486 if support.verbose:
2487 sys.stdout.write(" server: read %s from client\n" % repr(data))
2488 if not data:
2489 self.close()
2490 else:
2491 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002492
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002493 def handle_close(self):
2494 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002495 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002496 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002497
2498 def handle_error(self):
2499 raise
2500
Trent Nelson78520002008-04-10 20:54:35 +00002501 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 self.certfile = certfile
2503 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2504 self.port = support.bind_port(sock, '')
2505 asyncore.dispatcher.__init__(self, sock)
2506 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002507
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002508 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002509 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002510 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2511 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002512
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002513 def handle_error(self):
2514 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002515
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002516 def __init__(self, certfile):
2517 self.flag = None
2518 self.active = False
2519 self.server = self.EchoServer(certfile)
2520 self.port = self.server.port
2521 threading.Thread.__init__(self)
2522 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002523
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002524 def __str__(self):
2525 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002526
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 def __enter__(self):
2528 self.start(threading.Event())
2529 self.flag.wait()
2530 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002531
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002532 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002533 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002534 sys.stdout.write(" cleanup: stopping server.\n")
2535 self.stop()
2536 if support.verbose:
2537 sys.stdout.write(" cleanup: joining server thread.\n")
2538 self.join()
2539 if support.verbose:
2540 sys.stdout.write(" cleanup: successfully joined.\n")
2541 # make sure that ConnectionHandler is removed from socket_map
2542 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002543
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002544 def start (self, flag=None):
2545 self.flag = flag
2546 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002547
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002548 def run(self):
2549 self.active = True
2550 if self.flag:
2551 self.flag.set()
2552 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002553 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002554 asyncore.loop(1)
2555 except:
2556 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002557
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002558 def stop(self):
2559 self.active = False
2560 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002561
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002562def server_params_test(client_context, server_context, indata=b"FOO\n",
2563 chatty=True, connectionchatty=False, sni_name=None,
2564 session=None):
2565 """
2566 Launch a server, connect a client to it and try various reads
2567 and writes.
2568 """
2569 stats = {}
2570 server = ThreadedEchoServer(context=server_context,
2571 chatty=chatty,
2572 connectionchatty=False)
2573 with server:
2574 with client_context.wrap_socket(socket.socket(),
2575 server_hostname=sni_name, session=session) as s:
2576 s.connect((HOST, server.port))
2577 for arg in [indata, bytearray(indata), memoryview(indata)]:
2578 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002579 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002580 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002581 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002582 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002583 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002584 if connectionchatty:
2585 if support.verbose:
2586 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002587 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002588 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002589 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2590 % (outdata[:20], len(outdata),
2591 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002592 s.write(b"over\n")
2593 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002594 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002595 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002596 stats.update({
2597 'compression': s.compression(),
2598 'cipher': s.cipher(),
2599 'peercert': s.getpeercert(),
2600 'client_alpn_protocol': s.selected_alpn_protocol(),
2601 'client_npn_protocol': s.selected_npn_protocol(),
2602 'version': s.version(),
2603 'session_reused': s.session_reused,
2604 'session': s.session,
2605 })
2606 s.close()
2607 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2608 stats['server_npn_protocols'] = server.selected_npn_protocols
2609 stats['server_shared_ciphers'] = server.shared_ciphers
2610 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002611
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002612def try_protocol_combo(server_protocol, client_protocol, expect_success,
2613 certsreqs=None, server_options=0, client_options=0):
2614 """
2615 Try to SSL-connect using *client_protocol* to *server_protocol*.
2616 If *expect_success* is true, assert that the connection succeeds,
2617 if it's false, assert that the connection fails.
2618 Also, if *expect_success* is a string, assert that it is the protocol
2619 version actually used by the connection.
2620 """
2621 if certsreqs is None:
2622 certsreqs = ssl.CERT_NONE
2623 certtype = {
2624 ssl.CERT_NONE: "CERT_NONE",
2625 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2626 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2627 }[certsreqs]
2628 if support.verbose:
2629 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2630 sys.stdout.write(formatstr %
2631 (ssl.get_protocol_name(client_protocol),
2632 ssl.get_protocol_name(server_protocol),
2633 certtype))
2634 client_context = ssl.SSLContext(client_protocol)
2635 client_context.options |= client_options
2636 server_context = ssl.SSLContext(server_protocol)
2637 server_context.options |= server_options
2638
Victor Stinner3ef63442019-02-19 18:06:03 +01002639 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2640 if (min_version is not None
2641 # SSLContext.minimum_version is only available on recent OpenSSL
2642 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2643 and hasattr(server_context, 'minimum_version')
2644 and server_protocol == ssl.PROTOCOL_TLS
2645 and server_context.minimum_version > min_version):
2646 # If OpenSSL configuration is strict and requires more recent TLS
2647 # version, we have to change the minimum to test old TLS versions.
2648 server_context.minimum_version = min_version
2649
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002650 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2651 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2652 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002653 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002654 client_context.set_ciphers("ALL")
2655
2656 for ctx in (client_context, server_context):
2657 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002658 ctx.load_cert_chain(SIGNED_CERTFILE)
2659 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002660 try:
2661 stats = server_params_test(client_context, server_context,
2662 chatty=False, connectionchatty=False)
2663 # Protocol mismatch can result in either an SSLError, or a
2664 # "Connection reset by peer" error.
2665 except ssl.SSLError:
2666 if expect_success:
2667 raise
2668 except OSError as e:
2669 if expect_success or e.errno != errno.ECONNRESET:
2670 raise
2671 else:
2672 if not expect_success:
2673 raise AssertionError(
2674 "Client protocol %s succeeded with server protocol %s!"
2675 % (ssl.get_protocol_name(client_protocol),
2676 ssl.get_protocol_name(server_protocol)))
2677 elif (expect_success is not True
2678 and expect_success != stats['version']):
2679 raise AssertionError("version mismatch: expected %r, got %r"
2680 % (expect_success, stats['version']))
2681
2682
2683class ThreadedTests(unittest.TestCase):
2684
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002685 def test_echo(self):
2686 """Basic test of an SSL client connecting to a server"""
2687 if support.verbose:
2688 sys.stdout.write("\n")
2689 for protocol in PROTOCOLS:
2690 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2691 continue
2692 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2693 context = ssl.SSLContext(protocol)
2694 context.load_cert_chain(CERTFILE)
2695 server_params_test(context, context,
2696 chatty=True, connectionchatty=True)
2697
Christian Heimesa170fa12017-09-15 20:27:30 +02002698 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002699
2700 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2701 server_params_test(client_context=client_context,
2702 server_context=server_context,
2703 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002704 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705
2706 client_context.check_hostname = False
2707 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2708 with self.assertRaises(ssl.SSLError) as e:
2709 server_params_test(client_context=server_context,
2710 server_context=client_context,
2711 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002712 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713 self.assertIn('called a function you should not call',
2714 str(e.exception))
2715
2716 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2717 with self.assertRaises(ssl.SSLError) as e:
2718 server_params_test(client_context=server_context,
2719 server_context=server_context,
2720 chatty=True, connectionchatty=True)
2721 self.assertIn('called a function you should not call',
2722 str(e.exception))
2723
2724 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2725 with self.assertRaises(ssl.SSLError) as e:
2726 server_params_test(client_context=server_context,
2727 server_context=client_context,
2728 chatty=True, connectionchatty=True)
2729 self.assertIn('called a function you should not call',
2730 str(e.exception))
2731
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002732 def test_getpeercert(self):
2733 if support.verbose:
2734 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002735
2736 client_context, server_context, hostname = testing_context()
2737 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002738 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002739 with client_context.wrap_socket(socket.socket(),
2740 do_handshake_on_connect=False,
2741 server_hostname=hostname) as s:
2742 s.connect((HOST, server.port))
2743 # getpeercert() raise ValueError while the handshake isn't
2744 # done.
2745 with self.assertRaises(ValueError):
2746 s.getpeercert()
2747 s.do_handshake()
2748 cert = s.getpeercert()
2749 self.assertTrue(cert, "Can't get peer certificate.")
2750 cipher = s.cipher()
2751 if support.verbose:
2752 sys.stdout.write(pprint.pformat(cert) + '\n')
2753 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2754 if 'subject' not in cert:
2755 self.fail("No subject field in certificate: %s." %
2756 pprint.pformat(cert))
2757 if ((('organizationName', 'Python Software Foundation'),)
2758 not in cert['subject']):
2759 self.fail(
2760 "Missing or invalid 'organizationName' field in certificate subject; "
2761 "should be 'Python Software Foundation'.")
2762 self.assertIn('notBefore', cert)
2763 self.assertIn('notAfter', cert)
2764 before = ssl.cert_time_to_seconds(cert['notBefore'])
2765 after = ssl.cert_time_to_seconds(cert['notAfter'])
2766 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002767
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002768 @unittest.skipUnless(have_verify_flags(),
2769 "verify_flags need OpenSSL > 0.9.8")
2770 def test_crl_check(self):
2771 if support.verbose:
2772 sys.stdout.write("\n")
2773
Christian Heimesa170fa12017-09-15 20:27:30 +02002774 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002775
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002776 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002777 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778
2779 # VERIFY_DEFAULT should pass
2780 server = ThreadedEchoServer(context=server_context, chatty=True)
2781 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002782 with client_context.wrap_socket(socket.socket(),
2783 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002784 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002785 cert = s.getpeercert()
2786 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002787
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002788 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002789 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002790
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002791 server = ThreadedEchoServer(context=server_context, chatty=True)
2792 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002793 with client_context.wrap_socket(socket.socket(),
2794 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002795 with self.assertRaisesRegex(ssl.SSLError,
2796 "certificate verify failed"):
2797 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002798
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002799 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002800 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002801
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002802 server = ThreadedEchoServer(context=server_context, chatty=True)
2803 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002804 with client_context.wrap_socket(socket.socket(),
2805 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002806 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002807 cert = s.getpeercert()
2808 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002809
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002810 def test_check_hostname(self):
2811 if support.verbose:
2812 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002813
Christian Heimesa170fa12017-09-15 20:27:30 +02002814 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002815
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002816 # correct hostname should verify
2817 server = ThreadedEchoServer(context=server_context, chatty=True)
2818 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002819 with client_context.wrap_socket(socket.socket(),
2820 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002821 s.connect((HOST, server.port))
2822 cert = s.getpeercert()
2823 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002824
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002825 # incorrect hostname should raise an exception
2826 server = ThreadedEchoServer(context=server_context, chatty=True)
2827 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002828 with client_context.wrap_socket(socket.socket(),
2829 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002830 with self.assertRaisesRegex(
2831 ssl.CertificateError,
2832 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002833 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002834
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 # missing server_hostname arg should cause an exception, too
2836 server = ThreadedEchoServer(context=server_context, chatty=True)
2837 with server:
2838 with socket.socket() as s:
2839 with self.assertRaisesRegex(ValueError,
2840 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002841 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002842
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002843 def test_ecc_cert(self):
2844 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2845 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002846 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002847 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2848
2849 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2850 # load ECC cert
2851 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2852
2853 # correct hostname should verify
2854 server = ThreadedEchoServer(context=server_context, chatty=True)
2855 with server:
2856 with client_context.wrap_socket(socket.socket(),
2857 server_hostname=hostname) as s:
2858 s.connect((HOST, server.port))
2859 cert = s.getpeercert()
2860 self.assertTrue(cert, "Can't get peer certificate.")
2861 cipher = s.cipher()[0].split('-')
2862 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2863
2864 def test_dual_rsa_ecc(self):
2865 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2866 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002867 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2868 # algorithms.
2869 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002870 # only ECDSA certs
2871 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2872 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2873
2874 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2875 # load ECC and RSA key/cert pairs
2876 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2877 server_context.load_cert_chain(SIGNED_CERTFILE)
2878
2879 # correct hostname should verify
2880 server = ThreadedEchoServer(context=server_context, chatty=True)
2881 with server:
2882 with client_context.wrap_socket(socket.socket(),
2883 server_hostname=hostname) as s:
2884 s.connect((HOST, server.port))
2885 cert = s.getpeercert()
2886 self.assertTrue(cert, "Can't get peer certificate.")
2887 cipher = s.cipher()[0].split('-')
2888 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2889
Christian Heimes66e57422018-01-29 14:25:13 +01002890 def test_check_hostname_idn(self):
2891 if support.verbose:
2892 sys.stdout.write("\n")
2893
Christian Heimes11a14932018-02-24 02:35:08 +01002894 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002895 server_context.load_cert_chain(IDNSANSFILE)
2896
Christian Heimes11a14932018-02-24 02:35:08 +01002897 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002898 context.verify_mode = ssl.CERT_REQUIRED
2899 context.check_hostname = True
2900 context.load_verify_locations(SIGNING_CA)
2901
2902 # correct hostname should verify, when specified in several
2903 # different ways
2904 idn_hostnames = [
2905 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002906 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002907 ('xn--knig-5qa.idn.pythontest.net',
2908 'xn--knig-5qa.idn.pythontest.net'),
2909 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002910 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002911
2912 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002913 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002914 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2915 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2916 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002917 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2918
2919 # ('königsgäßchen.idna2008.pythontest.net',
2920 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2921 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2922 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2923 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2924 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2925
Christian Heimes66e57422018-01-29 14:25:13 +01002926 ]
2927 for server_hostname, expected_hostname in idn_hostnames:
2928 server = ThreadedEchoServer(context=server_context, chatty=True)
2929 with server:
2930 with context.wrap_socket(socket.socket(),
2931 server_hostname=server_hostname) as s:
2932 self.assertEqual(s.server_hostname, expected_hostname)
2933 s.connect((HOST, server.port))
2934 cert = s.getpeercert()
2935 self.assertEqual(s.server_hostname, expected_hostname)
2936 self.assertTrue(cert, "Can't get peer certificate.")
2937
Christian Heimes66e57422018-01-29 14:25:13 +01002938 # incorrect hostname should raise an exception
2939 server = ThreadedEchoServer(context=server_context, chatty=True)
2940 with server:
2941 with context.wrap_socket(socket.socket(),
2942 server_hostname="python.example.org") as s:
2943 with self.assertRaises(ssl.CertificateError):
2944 s.connect((HOST, server.port))
2945
Christian Heimes529525f2018-05-23 22:24:45 +02002946 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002947 """Connecting when the server rejects the client's certificate
2948
2949 Launch a server with CERT_REQUIRED, and check that trying to
2950 connect to it with a wrong client certificate fails.
2951 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002952 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002953 # load client cert that is not signed by trusted CA
2954 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002955 # require TLS client authentication
2956 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002957 # TLS 1.3 has different handshake
2958 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002959
2960 server = ThreadedEchoServer(
2961 context=server_context, chatty=True, connectionchatty=True,
2962 )
2963
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002964 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002965 client_context.wrap_socket(socket.socket(),
2966 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002967 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002968 # Expect either an SSL error about the server rejecting
2969 # the connection, or a low-level connection reset (which
2970 # sometimes happens on Windows)
2971 s.connect((HOST, server.port))
2972 except ssl.SSLError as e:
2973 if support.verbose:
2974 sys.stdout.write("\nSSLError is %r\n" % e)
2975 except OSError as e:
2976 if e.errno != errno.ECONNRESET:
2977 raise
2978 if support.verbose:
2979 sys.stdout.write("\nsocket.error is %r\n" % e)
2980 else:
2981 self.fail("Use of invalid cert should have failed!")
2982
Christian Heimes529525f2018-05-23 22:24:45 +02002983 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2984 def test_wrong_cert_tls13(self):
2985 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002986 # load client cert that is not signed by trusted CA
2987 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002988 server_context.verify_mode = ssl.CERT_REQUIRED
2989 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2990 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2991
2992 server = ThreadedEchoServer(
2993 context=server_context, chatty=True, connectionchatty=True,
2994 )
2995 with server, \
2996 client_context.wrap_socket(socket.socket(),
2997 server_hostname=hostname) as s:
2998 # TLS 1.3 perform client cert exchange after handshake
2999 s.connect((HOST, server.port))
3000 try:
3001 s.write(b'data')
3002 s.read(4)
3003 except ssl.SSLError as e:
3004 if support.verbose:
3005 sys.stdout.write("\nSSLError is %r\n" % e)
3006 except OSError as e:
3007 if e.errno != errno.ECONNRESET:
3008 raise
3009 if support.verbose:
3010 sys.stdout.write("\nsocket.error is %r\n" % e)
3011 else:
3012 self.fail("Use of invalid cert should have failed!")
3013
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003014 def test_rude_shutdown(self):
3015 """A brutal shutdown of an SSL server should raise an OSError
3016 in the client when attempting handshake.
3017 """
3018 listener_ready = threading.Event()
3019 listener_gone = threading.Event()
3020
3021 s = socket.socket()
3022 port = support.bind_port(s, HOST)
3023
3024 # `listener` runs in a thread. It sits in an accept() until
3025 # the main thread connects. Then it rudely closes the socket,
3026 # and sets Event `listener_gone` to let the main thread know
3027 # the socket is gone.
3028 def listener():
3029 s.listen()
3030 listener_ready.set()
3031 newsock, addr = s.accept()
3032 newsock.close()
3033 s.close()
3034 listener_gone.set()
3035
3036 def connector():
3037 listener_ready.wait()
3038 with socket.socket() as c:
3039 c.connect((HOST, port))
3040 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003041 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003042 ssl_sock = test_wrap_socket(c)
3043 except OSError:
3044 pass
3045 else:
3046 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003047
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003048 t = threading.Thread(target=listener)
3049 t.start()
3050 try:
3051 connector()
3052 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003053 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003054
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003055 def test_ssl_cert_verify_error(self):
3056 if support.verbose:
3057 sys.stdout.write("\n")
3058
3059 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3060 server_context.load_cert_chain(SIGNED_CERTFILE)
3061
3062 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3063
3064 server = ThreadedEchoServer(context=server_context, chatty=True)
3065 with server:
3066 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003067 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003068 try:
3069 s.connect((HOST, server.port))
3070 except ssl.SSLError as e:
3071 msg = 'unable to get local issuer certificate'
3072 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3073 self.assertEqual(e.verify_code, 20)
3074 self.assertEqual(e.verify_message, msg)
3075 self.assertIn(msg, repr(e))
3076 self.assertIn('certificate verify failed', repr(e))
3077
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003078 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3079 "OpenSSL is compiled without SSLv2 support")
3080 def test_protocol_sslv2(self):
3081 """Connecting to an SSLv2 server with various client options"""
3082 if support.verbose:
3083 sys.stdout.write("\n")
3084 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3085 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3086 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003087 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003088 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3089 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3090 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3091 # SSLv23 client with specific SSL options
3092 if no_sslv2_implies_sslv3_hello():
3093 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003094 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003095 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003096 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003097 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003098 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003099 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003100
Christian Heimesa170fa12017-09-15 20:27:30 +02003101 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003102 """Connecting to an SSLv23 server with various client options"""
3103 if support.verbose:
3104 sys.stdout.write("\n")
3105 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003106 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003107 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003108 except OSError as x:
3109 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3110 if support.verbose:
3111 sys.stdout.write(
3112 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3113 % str(x))
3114 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003115 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3116 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3117 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003118
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003119 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003120 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3121 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3122 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123
3124 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003125 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3126 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3127 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003128
3129 # Server with specific SSL options
3130 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003131 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003132 server_options=ssl.OP_NO_SSLv3)
3133 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003134 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003135 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003136 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003137 server_options=ssl.OP_NO_TLSv1)
3138
3139
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003140 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3141 "OpenSSL is compiled without SSLv3 support")
3142 def test_protocol_sslv3(self):
3143 """Connecting to an SSLv3 server with various client options"""
3144 if support.verbose:
3145 sys.stdout.write("\n")
3146 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3147 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3148 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3149 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3150 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003151 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003152 client_options=ssl.OP_NO_SSLv3)
3153 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3154 if no_sslv2_implies_sslv3_hello():
3155 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003156 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003157 False, client_options=ssl.OP_NO_SSLv2)
3158
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003159 def test_protocol_tlsv1(self):
3160 """Connecting to a TLSv1 server with various client options"""
3161 if support.verbose:
3162 sys.stdout.write("\n")
3163 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3164 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3165 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3166 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3167 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3168 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3169 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003170 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003171 client_options=ssl.OP_NO_TLSv1)
3172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003173 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3174 "TLS version 1.1 not supported.")
3175 def test_protocol_tlsv1_1(self):
3176 """Connecting to a TLSv1.1 server with various client options.
3177 Testing against older TLS versions."""
3178 if support.verbose:
3179 sys.stdout.write("\n")
3180 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3181 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3182 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3183 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3184 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003185 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003186 client_options=ssl.OP_NO_TLSv1_1)
3187
Christian Heimesa170fa12017-09-15 20:27:30 +02003188 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003189 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3190 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3191
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003192 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3193 "TLS version 1.2 not supported.")
3194 def test_protocol_tlsv1_2(self):
3195 """Connecting to a TLSv1.2 server with various client options.
3196 Testing against older TLS versions."""
3197 if support.verbose:
3198 sys.stdout.write("\n")
3199 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3200 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3201 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3202 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3203 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3204 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3205 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003206 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003207 client_options=ssl.OP_NO_TLSv1_2)
3208
Christian Heimesa170fa12017-09-15 20:27:30 +02003209 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003210 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3211 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3212 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3213 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3214
3215 def test_starttls(self):
3216 """Switching from clear text to encrypted and back again."""
3217 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3218
3219 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003220 starttls_server=True,
3221 chatty=True,
3222 connectionchatty=True)
3223 wrapped = False
3224 with server:
3225 s = socket.socket()
3226 s.setblocking(1)
3227 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003228 if support.verbose:
3229 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003230 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003231 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003232 sys.stdout.write(
3233 " client: sending %r...\n" % indata)
3234 if wrapped:
3235 conn.write(indata)
3236 outdata = conn.read()
3237 else:
3238 s.send(indata)
3239 outdata = s.recv(1024)
3240 msg = outdata.strip().lower()
3241 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3242 # STARTTLS ok, switch to secure mode
3243 if support.verbose:
3244 sys.stdout.write(
3245 " client: read %r from server, starting TLS...\n"
3246 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003247 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003248 wrapped = True
3249 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3250 # ENDTLS ok, switch back to clear text
3251 if support.verbose:
3252 sys.stdout.write(
3253 " client: read %r from server, ending TLS...\n"
3254 % msg)
3255 s = conn.unwrap()
3256 wrapped = False
3257 else:
3258 if support.verbose:
3259 sys.stdout.write(
3260 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003261 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003262 sys.stdout.write(" client: closing connection.\n")
3263 if wrapped:
3264 conn.write(b"over\n")
3265 else:
3266 s.send(b"over\n")
3267 if wrapped:
3268 conn.close()
3269 else:
3270 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003271
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003272 def test_socketserver(self):
3273 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003274 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003275 # try to connect
3276 if support.verbose:
3277 sys.stdout.write('\n')
3278 with open(CERTFILE, 'rb') as f:
3279 d1 = f.read()
3280 d2 = ''
3281 # now fetch the same data from the HTTPS server
3282 url = 'https://localhost:%d/%s' % (
3283 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003284 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003285 f = urllib.request.urlopen(url, context=context)
3286 try:
3287 dlen = f.info().get("content-length")
3288 if dlen and (int(dlen) > 0):
3289 d2 = f.read(int(dlen))
3290 if support.verbose:
3291 sys.stdout.write(
3292 " client: read %d bytes from remote server '%s'\n"
3293 % (len(d2), server))
3294 finally:
3295 f.close()
3296 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003297
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003298 def test_asyncore_server(self):
3299 """Check the example asyncore integration."""
3300 if support.verbose:
3301 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003302
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003303 indata = b"FOO\n"
3304 server = AsyncoreEchoServer(CERTFILE)
3305 with server:
3306 s = test_wrap_socket(socket.socket())
3307 s.connect(('127.0.0.1', server.port))
3308 if support.verbose:
3309 sys.stdout.write(
3310 " client: sending %r...\n" % indata)
3311 s.write(indata)
3312 outdata = s.read()
3313 if support.verbose:
3314 sys.stdout.write(" client: read %r\n" % outdata)
3315 if outdata != indata.lower():
3316 self.fail(
3317 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3318 % (outdata[:20], len(outdata),
3319 indata[:20].lower(), len(indata)))
3320 s.write(b"over\n")
3321 if support.verbose:
3322 sys.stdout.write(" client: closing connection.\n")
3323 s.close()
3324 if support.verbose:
3325 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003326
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003327 def test_recv_send(self):
3328 """Test recv(), send() and friends."""
3329 if support.verbose:
3330 sys.stdout.write("\n")
3331
3332 server = ThreadedEchoServer(CERTFILE,
3333 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003334 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003335 cacerts=CERTFILE,
3336 chatty=True,
3337 connectionchatty=False)
3338 with server:
3339 s = test_wrap_socket(socket.socket(),
3340 server_side=False,
3341 certfile=CERTFILE,
3342 ca_certs=CERTFILE,
3343 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003344 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003345 s.connect((HOST, server.port))
3346 # helper methods for standardising recv* method signatures
3347 def _recv_into():
3348 b = bytearray(b"\0"*100)
3349 count = s.recv_into(b)
3350 return b[:count]
3351
3352 def _recvfrom_into():
3353 b = bytearray(b"\0"*100)
3354 count, addr = s.recvfrom_into(b)
3355 return b[:count]
3356
3357 # (name, method, expect success?, *args, return value func)
3358 send_methods = [
3359 ('send', s.send, True, [], len),
3360 ('sendto', s.sendto, False, ["some.address"], len),
3361 ('sendall', s.sendall, True, [], lambda x: None),
3362 ]
3363 # (name, method, whether to expect success, *args)
3364 recv_methods = [
3365 ('recv', s.recv, True, []),
3366 ('recvfrom', s.recvfrom, False, ["some.address"]),
3367 ('recv_into', _recv_into, True, []),
3368 ('recvfrom_into', _recvfrom_into, False, []),
3369 ]
3370 data_prefix = "PREFIX_"
3371
3372 for (meth_name, send_meth, expect_success, args,
3373 ret_val_meth) in send_methods:
3374 indata = (data_prefix + meth_name).encode('ascii')
3375 try:
3376 ret = send_meth(indata, *args)
3377 msg = "sending with {}".format(meth_name)
3378 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3379 outdata = s.read()
3380 if outdata != indata.lower():
3381 self.fail(
3382 "While sending with <<{name:s}>> bad data "
3383 "<<{outdata:r}>> ({nout:d}) received; "
3384 "expected <<{indata:r}>> ({nin:d})\n".format(
3385 name=meth_name, outdata=outdata[:20],
3386 nout=len(outdata),
3387 indata=indata[:20], nin=len(indata)
3388 )
3389 )
3390 except ValueError as e:
3391 if expect_success:
3392 self.fail(
3393 "Failed to send with method <<{name:s}>>; "
3394 "expected to succeed.\n".format(name=meth_name)
3395 )
3396 if not str(e).startswith(meth_name):
3397 self.fail(
3398 "Method <<{name:s}>> failed with unexpected "
3399 "exception message: {exp:s}\n".format(
3400 name=meth_name, exp=e
3401 )
3402 )
3403
3404 for meth_name, recv_meth, expect_success, args in recv_methods:
3405 indata = (data_prefix + meth_name).encode('ascii')
3406 try:
3407 s.send(indata)
3408 outdata = recv_meth(*args)
3409 if outdata != indata.lower():
3410 self.fail(
3411 "While receiving with <<{name:s}>> bad data "
3412 "<<{outdata:r}>> ({nout:d}) received; "
3413 "expected <<{indata:r}>> ({nin:d})\n".format(
3414 name=meth_name, outdata=outdata[:20],
3415 nout=len(outdata),
3416 indata=indata[:20], nin=len(indata)
3417 )
3418 )
3419 except ValueError as e:
3420 if expect_success:
3421 self.fail(
3422 "Failed to receive with method <<{name:s}>>; "
3423 "expected to succeed.\n".format(name=meth_name)
3424 )
3425 if not str(e).startswith(meth_name):
3426 self.fail(
3427 "Method <<{name:s}>> failed with unexpected "
3428 "exception message: {exp:s}\n".format(
3429 name=meth_name, exp=e
3430 )
3431 )
3432 # consume data
3433 s.read()
3434
3435 # read(-1, buffer) is supported, even though read(-1) is not
3436 data = b"data"
3437 s.send(data)
3438 buffer = bytearray(len(data))
3439 self.assertEqual(s.read(-1, buffer), len(data))
3440 self.assertEqual(buffer, data)
3441
Christian Heimes888bbdc2017-09-07 14:18:21 -07003442 # sendall accepts bytes-like objects
3443 if ctypes is not None:
3444 ubyte = ctypes.c_ubyte * len(data)
3445 byteslike = ubyte.from_buffer_copy(data)
3446 s.sendall(byteslike)
3447 self.assertEqual(s.read(), data)
3448
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003449 # Make sure sendmsg et al are disallowed to avoid
3450 # inadvertent disclosure of data and/or corruption
3451 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003452 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003453 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3454 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3455 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003456 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003457 s.write(b"over\n")
3458
3459 self.assertRaises(ValueError, s.recv, -1)
3460 self.assertRaises(ValueError, s.read, -1)
3461
3462 s.close()
3463
3464 def test_recv_zero(self):
3465 server = ThreadedEchoServer(CERTFILE)
3466 server.__enter__()
3467 self.addCleanup(server.__exit__, None, None)
3468 s = socket.create_connection((HOST, server.port))
3469 self.addCleanup(s.close)
3470 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3471 self.addCleanup(s.close)
3472
3473 # recv/read(0) should return no data
3474 s.send(b"data")
3475 self.assertEqual(s.recv(0), b"")
3476 self.assertEqual(s.read(0), b"")
3477 self.assertEqual(s.read(), b"data")
3478
3479 # Should not block if the other end sends no data
3480 s.setblocking(False)
3481 self.assertEqual(s.recv(0), b"")
3482 self.assertEqual(s.recv_into(bytearray()), 0)
3483
3484 def test_nonblocking_send(self):
3485 server = ThreadedEchoServer(CERTFILE,
3486 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003487 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003488 cacerts=CERTFILE,
3489 chatty=True,
3490 connectionchatty=False)
3491 with server:
3492 s = test_wrap_socket(socket.socket(),
3493 server_side=False,
3494 certfile=CERTFILE,
3495 ca_certs=CERTFILE,
3496 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003497 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003498 s.connect((HOST, server.port))
3499 s.setblocking(False)
3500
3501 # If we keep sending data, at some point the buffers
3502 # will be full and the call will block
3503 buf = bytearray(8192)
3504 def fill_buffer():
3505 while True:
3506 s.send(buf)
3507 self.assertRaises((ssl.SSLWantWriteError,
3508 ssl.SSLWantReadError), fill_buffer)
3509
3510 # Now read all the output and discard it
3511 s.setblocking(True)
3512 s.close()
3513
3514 def test_handshake_timeout(self):
3515 # Issue #5103: SSL handshake must respect the socket timeout
3516 server = socket.socket(socket.AF_INET)
3517 host = "127.0.0.1"
3518 port = support.bind_port(server)
3519 started = threading.Event()
3520 finish = False
3521
3522 def serve():
3523 server.listen()
3524 started.set()
3525 conns = []
3526 while not finish:
3527 r, w, e = select.select([server], [], [], 0.1)
3528 if server in r:
3529 # Let the socket hang around rather than having
3530 # it closed by garbage collection.
3531 conns.append(server.accept()[0])
3532 for sock in conns:
3533 sock.close()
3534
3535 t = threading.Thread(target=serve)
3536 t.start()
3537 started.wait()
3538
3539 try:
3540 try:
3541 c = socket.socket(socket.AF_INET)
3542 c.settimeout(0.2)
3543 c.connect((host, port))
3544 # Will attempt handshake and time out
3545 self.assertRaisesRegex(socket.timeout, "timed out",
3546 test_wrap_socket, c)
3547 finally:
3548 c.close()
3549 try:
3550 c = socket.socket(socket.AF_INET)
3551 c = test_wrap_socket(c)
3552 c.settimeout(0.2)
3553 # Will attempt handshake and time out
3554 self.assertRaisesRegex(socket.timeout, "timed out",
3555 c.connect, (host, port))
3556 finally:
3557 c.close()
3558 finally:
3559 finish = True
3560 t.join()
3561 server.close()
3562
3563 def test_server_accept(self):
3564 # Issue #16357: accept() on a SSLSocket created through
3565 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003566 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003567 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003568 context.load_verify_locations(SIGNING_CA)
3569 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003570 server = socket.socket(socket.AF_INET)
3571 host = "127.0.0.1"
3572 port = support.bind_port(server)
3573 server = context.wrap_socket(server, server_side=True)
3574 self.assertTrue(server.server_side)
3575
3576 evt = threading.Event()
3577 remote = None
3578 peer = None
3579 def serve():
3580 nonlocal remote, peer
3581 server.listen()
3582 # Block on the accept and wait on the connection to close.
3583 evt.set()
3584 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003585 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003586
3587 t = threading.Thread(target=serve)
3588 t.start()
3589 # Client wait until server setup and perform a connect.
3590 evt.wait()
3591 client = context.wrap_socket(socket.socket())
3592 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003593 client.send(b'data')
3594 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003595 client_addr = client.getsockname()
3596 client.close()
3597 t.join()
3598 remote.close()
3599 server.close()
3600 # Sanity checks.
3601 self.assertIsInstance(remote, ssl.SSLSocket)
3602 self.assertEqual(peer, client_addr)
3603
3604 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003605 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003606 with context.wrap_socket(socket.socket()) as sock:
3607 with self.assertRaises(OSError) as cm:
3608 sock.getpeercert()
3609 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3610
3611 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003612 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003613 with context.wrap_socket(socket.socket()) as sock:
3614 with self.assertRaises(OSError) as cm:
3615 sock.do_handshake()
3616 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3617
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003618 def test_no_shared_ciphers(self):
3619 client_context, server_context, hostname = testing_context()
3620 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3621 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003622 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003623 client_context.set_ciphers("AES128")
3624 server_context.set_ciphers("AES256")
3625 with ThreadedEchoServer(context=server_context) as server:
3626 with client_context.wrap_socket(socket.socket(),
3627 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003628 with self.assertRaises(OSError):
3629 s.connect((HOST, server.port))
3630 self.assertIn("no shared cipher", server.conn_errors[0])
3631
3632 def test_version_basic(self):
3633 """
3634 Basic tests for SSLSocket.version().
3635 More tests are done in the test_protocol_*() methods.
3636 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003637 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3638 context.check_hostname = False
3639 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003640 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003641 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003642 chatty=False) as server:
3643 with context.wrap_socket(socket.socket()) as s:
3644 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003645 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003646 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003647 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003648 self.assertEqual(s.version(), 'TLSv1.3')
3649 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003650 self.assertEqual(s.version(), 'TLSv1.2')
3651 else: # 0.9.8 to 1.0.1
3652 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003653 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003654 self.assertIs(s.version(), None)
3655
Christian Heimescb5b68a2017-09-07 18:07:00 -07003656 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3657 "test requires TLSv1.3 enabled OpenSSL")
3658 def test_tls1_3(self):
3659 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3660 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003661 context.options |= (
3662 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3663 )
3664 with ThreadedEchoServer(context=context) as server:
3665 with context.wrap_socket(socket.socket()) as s:
3666 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003667 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003668 'TLS_AES_256_GCM_SHA384',
3669 'TLS_CHACHA20_POLY1305_SHA256',
3670 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003671 })
3672 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003673
Christian Heimes698dde12018-02-27 11:54:43 +01003674 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3675 "required OpenSSL 1.1.0g")
3676 def test_min_max_version(self):
3677 client_context, server_context, hostname = testing_context()
3678 # client TLSv1.0 to 1.2
3679 client_context.minimum_version = ssl.TLSVersion.TLSv1
3680 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3681 # server only TLSv1.2
3682 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3683 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3684
3685 with ThreadedEchoServer(context=server_context) as server:
3686 with client_context.wrap_socket(socket.socket(),
3687 server_hostname=hostname) as s:
3688 s.connect((HOST, server.port))
3689 self.assertEqual(s.version(), 'TLSv1.2')
3690
3691 # client 1.0 to 1.2, server 1.0 to 1.1
3692 server_context.minimum_version = ssl.TLSVersion.TLSv1
3693 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3694
3695 with ThreadedEchoServer(context=server_context) as server:
3696 with client_context.wrap_socket(socket.socket(),
3697 server_hostname=hostname) as s:
3698 s.connect((HOST, server.port))
3699 self.assertEqual(s.version(), 'TLSv1.1')
3700
3701 # client 1.0, server 1.2 (mismatch)
3702 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3703 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3704 client_context.minimum_version = ssl.TLSVersion.TLSv1
3705 client_context.maximum_version = ssl.TLSVersion.TLSv1
3706 with ThreadedEchoServer(context=server_context) as server:
3707 with client_context.wrap_socket(socket.socket(),
3708 server_hostname=hostname) as s:
3709 with self.assertRaises(ssl.SSLError) as e:
3710 s.connect((HOST, server.port))
3711 self.assertIn("alert", str(e.exception))
3712
3713
3714 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3715 "required OpenSSL 1.1.0g")
3716 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3717 def test_min_max_version_sslv3(self):
3718 client_context, server_context, hostname = testing_context()
3719 server_context.minimum_version = ssl.TLSVersion.SSLv3
3720 client_context.minimum_version = ssl.TLSVersion.SSLv3
3721 client_context.maximum_version = ssl.TLSVersion.SSLv3
3722 with ThreadedEchoServer(context=server_context) as server:
3723 with client_context.wrap_socket(socket.socket(),
3724 server_hostname=hostname) as s:
3725 s.connect((HOST, server.port))
3726 self.assertEqual(s.version(), 'SSLv3')
3727
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003728 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3729 def test_default_ecdh_curve(self):
3730 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3731 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003732 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003733 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003734 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3735 # cipher name.
3736 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003737 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3738 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3739 # our default cipher list should prefer ECDH-based ciphers
3740 # automatically.
3741 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3742 context.set_ciphers("ECCdraft:ECDH")
3743 with ThreadedEchoServer(context=context) as server:
3744 with context.wrap_socket(socket.socket()) as s:
3745 s.connect((HOST, server.port))
3746 self.assertIn("ECDH", s.cipher()[0])
3747
3748 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3749 "'tls-unique' channel binding not available")
3750 def test_tls_unique_channel_binding(self):
3751 """Test tls-unique channel binding."""
3752 if support.verbose:
3753 sys.stdout.write("\n")
3754
Christian Heimes05d9fe32018-02-27 08:55:39 +01003755 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003756
3757 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003758 chatty=True,
3759 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003760
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003762 with client_context.wrap_socket(
3763 socket.socket(),
3764 server_hostname=hostname) as s:
3765 s.connect((HOST, server.port))
3766 # get the data
3767 cb_data = s.get_channel_binding("tls-unique")
3768 if support.verbose:
3769 sys.stdout.write(
3770 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003771
Christian Heimes05d9fe32018-02-27 08:55:39 +01003772 # check if it is sane
3773 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003774 if s.version() == 'TLSv1.3':
3775 self.assertEqual(len(cb_data), 48)
3776 else:
3777 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003778
Christian Heimes05d9fe32018-02-27 08:55:39 +01003779 # and compare with the peers version
3780 s.write(b"CB tls-unique\n")
3781 peer_data_repr = s.read().strip()
3782 self.assertEqual(peer_data_repr,
3783 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003784
3785 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003786 with client_context.wrap_socket(
3787 socket.socket(),
3788 server_hostname=hostname) as s:
3789 s.connect((HOST, server.port))
3790 new_cb_data = s.get_channel_binding("tls-unique")
3791 if support.verbose:
3792 sys.stdout.write(
3793 "got another channel binding data: {0!r}\n".format(
3794 new_cb_data)
3795 )
3796 # is it really unique
3797 self.assertNotEqual(cb_data, new_cb_data)
3798 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003799 if s.version() == 'TLSv1.3':
3800 self.assertEqual(len(cb_data), 48)
3801 else:
3802 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003803 s.write(b"CB tls-unique\n")
3804 peer_data_repr = s.read().strip()
3805 self.assertEqual(peer_data_repr,
3806 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003807
3808 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003809 client_context, server_context, hostname = testing_context()
3810 stats = server_params_test(client_context, server_context,
3811 chatty=True, connectionchatty=True,
3812 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003813 if support.verbose:
3814 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3815 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3816
3817 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3818 "ssl.OP_NO_COMPRESSION needed for this test")
3819 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003820 client_context, server_context, hostname = testing_context()
3821 client_context.options |= ssl.OP_NO_COMPRESSION
3822 server_context.options |= ssl.OP_NO_COMPRESSION
3823 stats = server_params_test(client_context, server_context,
3824 chatty=True, connectionchatty=True,
3825 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003826 self.assertIs(stats['compression'], None)
3827
3828 def test_dh_params(self):
3829 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003830 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003831 # test scenario needs TLS <= 1.2
3832 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003833 server_context.load_dh_params(DHFILE)
3834 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003835 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003836 stats = server_params_test(client_context, server_context,
3837 chatty=True, connectionchatty=True,
3838 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003839 cipher = stats["cipher"][0]
3840 parts = cipher.split("-")
3841 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3842 self.fail("Non-DH cipher: " + cipher[0])
3843
Christian Heimesb7b92252018-02-25 09:49:31 +01003844 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003845 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003846 def test_ecdh_curve(self):
3847 # server secp384r1, client auto
3848 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003849
Christian Heimesb7b92252018-02-25 09:49:31 +01003850 server_context.set_ecdh_curve("secp384r1")
3851 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3852 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3853 stats = server_params_test(client_context, server_context,
3854 chatty=True, connectionchatty=True,
3855 sni_name=hostname)
3856
3857 # server auto, client secp384r1
3858 client_context, server_context, hostname = testing_context()
3859 client_context.set_ecdh_curve("secp384r1")
3860 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3861 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3862 stats = server_params_test(client_context, server_context,
3863 chatty=True, connectionchatty=True,
3864 sni_name=hostname)
3865
3866 # server / client curve mismatch
3867 client_context, server_context, hostname = testing_context()
3868 client_context.set_ecdh_curve("prime256v1")
3869 server_context.set_ecdh_curve("secp384r1")
3870 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3871 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3872 try:
3873 stats = server_params_test(client_context, server_context,
3874 chatty=True, connectionchatty=True,
3875 sni_name=hostname)
3876 except ssl.SSLError:
3877 pass
3878 else:
3879 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003880 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003881 self.fail("mismatch curve did not fail")
3882
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003883 def test_selected_alpn_protocol(self):
3884 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003885 client_context, server_context, hostname = testing_context()
3886 stats = server_params_test(client_context, server_context,
3887 chatty=True, connectionchatty=True,
3888 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003889 self.assertIs(stats['client_alpn_protocol'], None)
3890
3891 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3892 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3893 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003894 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003895 server_context.set_alpn_protocols(['foo', 'bar'])
3896 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003897 chatty=True, connectionchatty=True,
3898 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003899 self.assertIs(stats['client_alpn_protocol'], None)
3900
3901 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3902 def test_alpn_protocols(self):
3903 server_protocols = ['foo', 'bar', 'milkshake']
3904 protocol_tests = [
3905 (['foo', 'bar'], 'foo'),
3906 (['bar', 'foo'], 'foo'),
3907 (['milkshake'], 'milkshake'),
3908 (['http/3.0', 'http/4.0'], None)
3909 ]
3910 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003911 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003912 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003913 client_context.set_alpn_protocols(client_protocols)
3914
3915 try:
3916 stats = server_params_test(client_context,
3917 server_context,
3918 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003919 connectionchatty=True,
3920 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003921 except ssl.SSLError as e:
3922 stats = e
3923
Christian Heimes05d9fe32018-02-27 08:55:39 +01003924 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003925 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3926 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3927 self.assertIsInstance(stats, ssl.SSLError)
3928 else:
3929 msg = "failed trying %s (s) and %s (c).\n" \
3930 "was expecting %s, but got %%s from the %%s" \
3931 % (str(server_protocols), str(client_protocols),
3932 str(expected))
3933 client_result = stats['client_alpn_protocol']
3934 self.assertEqual(client_result, expected,
3935 msg % (client_result, "client"))
3936 server_result = stats['server_alpn_protocols'][-1] \
3937 if len(stats['server_alpn_protocols']) else 'nothing'
3938 self.assertEqual(server_result, expected,
3939 msg % (server_result, "server"))
3940
3941 def test_selected_npn_protocol(self):
3942 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003943 client_context, server_context, hostname = testing_context()
3944 stats = server_params_test(client_context, server_context,
3945 chatty=True, connectionchatty=True,
3946 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003947 self.assertIs(stats['client_npn_protocol'], None)
3948
3949 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3950 def test_npn_protocols(self):
3951 server_protocols = ['http/1.1', 'spdy/2']
3952 protocol_tests = [
3953 (['http/1.1', 'spdy/2'], 'http/1.1'),
3954 (['spdy/2', 'http/1.1'], 'http/1.1'),
3955 (['spdy/2', 'test'], 'spdy/2'),
3956 (['abc', 'def'], 'abc')
3957 ]
3958 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003959 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003961 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003962 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003963 chatty=True, connectionchatty=True,
3964 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003965 msg = "failed trying %s (s) and %s (c).\n" \
3966 "was expecting %s, but got %%s from the %%s" \
3967 % (str(server_protocols), str(client_protocols),
3968 str(expected))
3969 client_result = stats['client_npn_protocol']
3970 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3971 server_result = stats['server_npn_protocols'][-1] \
3972 if len(stats['server_npn_protocols']) else 'nothing'
3973 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3974
3975 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003976 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003977 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003978 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003979 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003980 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003981 client_context.load_verify_locations(SIGNING_CA)
3982 return server_context, other_context, client_context
3983
3984 def check_common_name(self, stats, name):
3985 cert = stats['peercert']
3986 self.assertIn((('commonName', name),), cert['subject'])
3987
3988 @needs_sni
3989 def test_sni_callback(self):
3990 calls = []
3991 server_context, other_context, client_context = self.sni_contexts()
3992
Christian Heimesa170fa12017-09-15 20:27:30 +02003993 client_context.check_hostname = False
3994
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003995 def servername_cb(ssl_sock, server_name, initial_context):
3996 calls.append((server_name, initial_context))
3997 if server_name is not None:
3998 ssl_sock.context = other_context
3999 server_context.set_servername_callback(servername_cb)
4000
4001 stats = server_params_test(client_context, server_context,
4002 chatty=True,
4003 sni_name='supermessage')
4004 # The hostname was fetched properly, and the certificate was
4005 # changed for the connection.
4006 self.assertEqual(calls, [("supermessage", server_context)])
4007 # CERTFILE4 was selected
4008 self.check_common_name(stats, 'fakehostname')
4009
4010 calls = []
4011 # The callback is called with server_name=None
4012 stats = server_params_test(client_context, server_context,
4013 chatty=True,
4014 sni_name=None)
4015 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004016 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017
4018 # Check disabling the callback
4019 calls = []
4020 server_context.set_servername_callback(None)
4021
4022 stats = server_params_test(client_context, server_context,
4023 chatty=True,
4024 sni_name='notfunny')
4025 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004026 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004027 self.assertEqual(calls, [])
4028
4029 @needs_sni
4030 def test_sni_callback_alert(self):
4031 # Returning a TLS alert is reflected to the connecting client
4032 server_context, other_context, client_context = self.sni_contexts()
4033
4034 def cb_returning_alert(ssl_sock, server_name, initial_context):
4035 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4036 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004037 with self.assertRaises(ssl.SSLError) as cm:
4038 stats = server_params_test(client_context, server_context,
4039 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004040 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004041 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004042
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004043 @needs_sni
4044 def test_sni_callback_raising(self):
4045 # Raising fails the connection with a TLS handshake failure alert.
4046 server_context, other_context, client_context = self.sni_contexts()
4047
4048 def cb_raising(ssl_sock, server_name, initial_context):
4049 1/0
4050 server_context.set_servername_callback(cb_raising)
4051
4052 with self.assertRaises(ssl.SSLError) as cm, \
4053 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004054 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004055 chatty=False,
4056 sni_name='supermessage')
4057 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4058 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004059
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004060 @needs_sni
4061 def test_sni_callback_wrong_return_type(self):
4062 # Returning the wrong return type terminates the TLS connection
4063 # with an internal error alert.
4064 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004065
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004066 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4067 return "foo"
4068 server_context.set_servername_callback(cb_wrong_return_type)
4069
4070 with self.assertRaises(ssl.SSLError) as cm, \
4071 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004072 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004073 chatty=False,
4074 sni_name='supermessage')
4075 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4076 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004077
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004078 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004079 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004080 client_context.set_ciphers("AES128:AES256")
4081 server_context.set_ciphers("AES256")
4082 expected_algs = [
4083 "AES256", "AES-256",
4084 # TLS 1.3 ciphers are always enabled
4085 "TLS_CHACHA20", "TLS_AES",
4086 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004087
Christian Heimesa170fa12017-09-15 20:27:30 +02004088 stats = server_params_test(client_context, server_context,
4089 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004090 ciphers = stats['server_shared_ciphers'][0]
4091 self.assertGreater(len(ciphers), 0)
4092 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004093 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004094 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004097 client_context, server_context, hostname = testing_context()
4098 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004099
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004100 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004101 s = client_context.wrap_socket(socket.socket(),
4102 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004103 s.connect((HOST, server.port))
4104 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004106 self.assertRaises(ValueError, s.read, 1024)
4107 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004108
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004109 def test_sendfile(self):
4110 TEST_DATA = b"x" * 512
4111 with open(support.TESTFN, 'wb') as f:
4112 f.write(TEST_DATA)
4113 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004114 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004115 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004116 context.load_verify_locations(SIGNING_CA)
4117 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004118 server = ThreadedEchoServer(context=context, chatty=False)
4119 with server:
4120 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004121 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004122 with open(support.TESTFN, 'rb') as file:
4123 s.sendfile(file)
4124 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004125
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004126 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004127 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004128 # TODO: sessions aren't compatible with TLSv1.3 yet
4129 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004130
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004132 stats = server_params_test(client_context, server_context,
4133 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004134 session = stats['session']
4135 self.assertTrue(session.id)
4136 self.assertGreater(session.time, 0)
4137 self.assertGreater(session.timeout, 0)
4138 self.assertTrue(session.has_ticket)
4139 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4140 self.assertGreater(session.ticket_lifetime_hint, 0)
4141 self.assertFalse(stats['session_reused'])
4142 sess_stat = server_context.session_stats()
4143 self.assertEqual(sess_stat['accept'], 1)
4144 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004145
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004146 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004147 stats = server_params_test(client_context, server_context,
4148 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004149 sess_stat = server_context.session_stats()
4150 self.assertEqual(sess_stat['accept'], 2)
4151 self.assertEqual(sess_stat['hits'], 1)
4152 self.assertTrue(stats['session_reused'])
4153 session2 = stats['session']
4154 self.assertEqual(session2.id, session.id)
4155 self.assertEqual(session2, session)
4156 self.assertIsNot(session2, session)
4157 self.assertGreaterEqual(session2.time, session.time)
4158 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004159
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004160 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004161 stats = server_params_test(client_context, server_context,
4162 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004163 self.assertFalse(stats['session_reused'])
4164 session3 = stats['session']
4165 self.assertNotEqual(session3.id, session.id)
4166 self.assertNotEqual(session3, session)
4167 sess_stat = server_context.session_stats()
4168 self.assertEqual(sess_stat['accept'], 3)
4169 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004170
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004171 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004172 stats = server_params_test(client_context, server_context,
4173 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004174 self.assertTrue(stats['session_reused'])
4175 session4 = stats['session']
4176 self.assertEqual(session4.id, session.id)
4177 self.assertEqual(session4, session)
4178 self.assertGreaterEqual(session4.time, session.time)
4179 self.assertGreaterEqual(session4.timeout, session.timeout)
4180 sess_stat = server_context.session_stats()
4181 self.assertEqual(sess_stat['accept'], 4)
4182 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004183
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004184 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004185 client_context, server_context, hostname = testing_context()
4186 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004187
Christian Heimes05d9fe32018-02-27 08:55:39 +01004188 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004189 client_context.options |= ssl.OP_NO_TLSv1_3
4190 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004191
Christian Heimesa170fa12017-09-15 20:27:30 +02004192 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004193 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004194 with client_context.wrap_socket(socket.socket(),
4195 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004196 # session is None before handshake
4197 self.assertEqual(s.session, None)
4198 self.assertEqual(s.session_reused, None)
4199 s.connect((HOST, server.port))
4200 session = s.session
4201 self.assertTrue(session)
4202 with self.assertRaises(TypeError) as e:
4203 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004204 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004205
Christian Heimesa170fa12017-09-15 20:27:30 +02004206 with client_context.wrap_socket(socket.socket(),
4207 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004208 s.connect((HOST, server.port))
4209 # cannot set session after handshake
4210 with self.assertRaises(ValueError) as e:
4211 s.session = session
4212 self.assertEqual(str(e.exception),
4213 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004214
Christian Heimesa170fa12017-09-15 20:27:30 +02004215 with client_context.wrap_socket(socket.socket(),
4216 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004217 # can set session before handshake and before the
4218 # connection was established
4219 s.session = session
4220 s.connect((HOST, server.port))
4221 self.assertEqual(s.session.id, session.id)
4222 self.assertEqual(s.session, session)
4223 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004224
Christian Heimesa170fa12017-09-15 20:27:30 +02004225 with client_context2.wrap_socket(socket.socket(),
4226 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004227 # cannot re-use session with a different SSLContext
4228 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004229 s.session = session
4230 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004231 self.assertEqual(str(e.exception),
4232 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004233
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004234
Christian Heimes9fb051f2018-09-23 08:32:31 +02004235@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4236class TestPostHandshakeAuth(unittest.TestCase):
4237 def test_pha_setter(self):
4238 protocols = [
4239 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4240 ]
4241 for protocol in protocols:
4242 ctx = ssl.SSLContext(protocol)
4243 self.assertEqual(ctx.post_handshake_auth, False)
4244
4245 ctx.post_handshake_auth = True
4246 self.assertEqual(ctx.post_handshake_auth, True)
4247
4248 ctx.verify_mode = ssl.CERT_REQUIRED
4249 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4250 self.assertEqual(ctx.post_handshake_auth, True)
4251
4252 ctx.post_handshake_auth = False
4253 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4254 self.assertEqual(ctx.post_handshake_auth, False)
4255
4256 ctx.verify_mode = ssl.CERT_OPTIONAL
4257 ctx.post_handshake_auth = True
4258 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4259 self.assertEqual(ctx.post_handshake_auth, True)
4260
4261 def test_pha_required(self):
4262 client_context, server_context, hostname = testing_context()
4263 server_context.post_handshake_auth = True
4264 server_context.verify_mode = ssl.CERT_REQUIRED
4265 client_context.post_handshake_auth = True
4266 client_context.load_cert_chain(SIGNED_CERTFILE)
4267
4268 server = ThreadedEchoServer(context=server_context, chatty=False)
4269 with server:
4270 with client_context.wrap_socket(socket.socket(),
4271 server_hostname=hostname) as s:
4272 s.connect((HOST, server.port))
4273 s.write(b'HASCERT')
4274 self.assertEqual(s.recv(1024), b'FALSE\n')
4275 s.write(b'PHA')
4276 self.assertEqual(s.recv(1024), b'OK\n')
4277 s.write(b'HASCERT')
4278 self.assertEqual(s.recv(1024), b'TRUE\n')
4279 # PHA method just returns true when cert is already available
4280 s.write(b'PHA')
4281 self.assertEqual(s.recv(1024), b'OK\n')
4282 s.write(b'GETCERT')
4283 cert_text = s.recv(4096).decode('us-ascii')
4284 self.assertIn('Python Software Foundation CA', cert_text)
4285
4286 def test_pha_required_nocert(self):
4287 client_context, server_context, hostname = testing_context()
4288 server_context.post_handshake_auth = True
4289 server_context.verify_mode = ssl.CERT_REQUIRED
4290 client_context.post_handshake_auth = True
4291
4292 server = ThreadedEchoServer(context=server_context, chatty=False)
4293 with server:
4294 with client_context.wrap_socket(socket.socket(),
4295 server_hostname=hostname) as s:
4296 s.connect((HOST, server.port))
4297 s.write(b'PHA')
4298 # receive CertificateRequest
4299 self.assertEqual(s.recv(1024), b'OK\n')
4300 # send empty Certificate + Finish
4301 s.write(b'HASCERT')
4302 # receive alert
4303 with self.assertRaisesRegex(
4304 ssl.SSLError,
4305 'tlsv13 alert certificate required'):
4306 s.recv(1024)
4307
4308 def test_pha_optional(self):
4309 if support.verbose:
4310 sys.stdout.write("\n")
4311
4312 client_context, server_context, hostname = testing_context()
4313 server_context.post_handshake_auth = True
4314 server_context.verify_mode = ssl.CERT_REQUIRED
4315 client_context.post_handshake_auth = True
4316 client_context.load_cert_chain(SIGNED_CERTFILE)
4317
4318 # check CERT_OPTIONAL
4319 server_context.verify_mode = ssl.CERT_OPTIONAL
4320 server = ThreadedEchoServer(context=server_context, chatty=False)
4321 with server:
4322 with client_context.wrap_socket(socket.socket(),
4323 server_hostname=hostname) as s:
4324 s.connect((HOST, server.port))
4325 s.write(b'HASCERT')
4326 self.assertEqual(s.recv(1024), b'FALSE\n')
4327 s.write(b'PHA')
4328 self.assertEqual(s.recv(1024), b'OK\n')
4329 s.write(b'HASCERT')
4330 self.assertEqual(s.recv(1024), b'TRUE\n')
4331
4332 def test_pha_optional_nocert(self):
4333 if support.verbose:
4334 sys.stdout.write("\n")
4335
4336 client_context, server_context, hostname = testing_context()
4337 server_context.post_handshake_auth = True
4338 server_context.verify_mode = ssl.CERT_OPTIONAL
4339 client_context.post_handshake_auth = True
4340
4341 server = ThreadedEchoServer(context=server_context, chatty=False)
4342 with server:
4343 with client_context.wrap_socket(socket.socket(),
4344 server_hostname=hostname) as s:
4345 s.connect((HOST, server.port))
4346 s.write(b'HASCERT')
4347 self.assertEqual(s.recv(1024), b'FALSE\n')
4348 s.write(b'PHA')
4349 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004350 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004351 s.write(b'HASCERT')
4352 self.assertEqual(s.recv(1024), b'FALSE\n')
4353
4354 def test_pha_no_pha_client(self):
4355 client_context, server_context, hostname = testing_context()
4356 server_context.post_handshake_auth = True
4357 server_context.verify_mode = ssl.CERT_REQUIRED
4358 client_context.load_cert_chain(SIGNED_CERTFILE)
4359
4360 server = ThreadedEchoServer(context=server_context, chatty=False)
4361 with server:
4362 with client_context.wrap_socket(socket.socket(),
4363 server_hostname=hostname) as s:
4364 s.connect((HOST, server.port))
4365 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4366 s.verify_client_post_handshake()
4367 s.write(b'PHA')
4368 self.assertIn(b'extension not received', s.recv(1024))
4369
4370 def test_pha_no_pha_server(self):
4371 # server doesn't have PHA enabled, cert is requested in handshake
4372 client_context, server_context, hostname = testing_context()
4373 server_context.verify_mode = ssl.CERT_REQUIRED
4374 client_context.post_handshake_auth = True
4375 client_context.load_cert_chain(SIGNED_CERTFILE)
4376
4377 server = ThreadedEchoServer(context=server_context, chatty=False)
4378 with server:
4379 with client_context.wrap_socket(socket.socket(),
4380 server_hostname=hostname) as s:
4381 s.connect((HOST, server.port))
4382 s.write(b'HASCERT')
4383 self.assertEqual(s.recv(1024), b'TRUE\n')
4384 # PHA doesn't fail if there is already a cert
4385 s.write(b'PHA')
4386 self.assertEqual(s.recv(1024), b'OK\n')
4387 s.write(b'HASCERT')
4388 self.assertEqual(s.recv(1024), b'TRUE\n')
4389
4390 def test_pha_not_tls13(self):
4391 # TLS 1.2
4392 client_context, server_context, hostname = testing_context()
4393 server_context.verify_mode = ssl.CERT_REQUIRED
4394 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4395 client_context.post_handshake_auth = True
4396 client_context.load_cert_chain(SIGNED_CERTFILE)
4397
4398 server = ThreadedEchoServer(context=server_context, chatty=False)
4399 with server:
4400 with client_context.wrap_socket(socket.socket(),
4401 server_hostname=hostname) as s:
4402 s.connect((HOST, server.port))
4403 # PHA fails for TLS != 1.3
4404 s.write(b'PHA')
4405 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4406
4407
Thomas Woutersed03b412007-08-28 21:37:11 +00004408def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004409 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004410 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004411 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004412 'Mac': platform.mac_ver,
4413 'Windows': platform.win32_ver,
4414 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004415 for name, func in plats.items():
4416 plat = func()
4417 if plat and plat[0]:
4418 plat = '%s %r' % (name, plat)
4419 break
4420 else:
4421 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004422 print("test_ssl: testing with %r %r" %
4423 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4424 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004425 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004426 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4427 try:
4428 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4429 except AttributeError:
4430 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004431
Antoine Pitrou152efa22010-05-16 18:19:27 +00004432 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004433 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004434 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004435 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004436 BADCERT, BADKEY, EMPTYCERT]:
4437 if not os.path.exists(filename):
4438 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004439
Martin Panter3840b2a2016-03-27 01:53:46 +00004440 tests = [
4441 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004442 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes9fb051f2018-09-23 08:32:31 +02004443 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004444 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004445
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004446 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004447 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004448
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004449 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004450 try:
4451 support.run_unittest(*tests)
4452 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004453 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004454
4455if __name__ == "__main__":
4456 test_main()