blob: a72d7913218128e715d72723817e0517a72473e6 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Christian Heimesc7f70692019-05-31 11:44:05 +020029from ssl import TLSVersion, _TLSContentType, _TLSMessageType, _TLSAlertType
Martin Panter3840b2a2016-03-27 01:53:46 +000030
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020033IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010034IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
35IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010036PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000037
Victor Stinner3ef63442019-02-19 18:06:03 +010038PROTOCOL_TO_TLS_VERSION = {}
39for proto, ver in (
40 ("PROTOCOL_SSLv23", "SSLv3"),
41 ("PROTOCOL_TLSv1", "TLSv1"),
42 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
43):
44 try:
45 proto = getattr(ssl, proto)
46 ver = getattr(ssl.TLSVersion, ver)
47 except AttributeError:
48 continue
49 PROTOCOL_TO_TLS_VERSION[proto] = ver
50
Christian Heimesefff7062013-11-21 03:35:02 +010051def data_file(*name):
52 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000053
Antoine Pitrou81564092010-10-08 23:06:24 +000054# The custom key and certificate files used in test_ssl are generated
55# using Lib/test/make_ssl_certs.py.
56# Other certificates are simply fetched from the Internet servers they
57# are meant to authenticate.
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000060BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000061ONLYCERT = data_file("ssl_cert.pem")
62ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000063BYTES_ONLYCERT = os.fsencode(ONLYCERT)
64BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020065CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
66ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
67KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000068CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000069BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010070CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
71CAFILE_CACERT = data_file("capath", "5ed36f99.0")
72
Christian Heimesbd5c7d22018-01-20 15:16:30 +010073CERTFILE_INFO = {
74 'issuer': ((('countryName', 'XY'),),
75 (('localityName', 'Castle Anthrax'),),
76 (('organizationName', 'Python Software Foundation'),),
77 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020078 'notAfter': 'Aug 26 14:23:15 2028 GMT',
79 'notBefore': 'Aug 29 14:23:15 2018 GMT',
80 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081 'subject': ((('countryName', 'XY'),),
82 (('localityName', 'Castle Anthrax'),),
83 (('organizationName', 'Python Software Foundation'),),
84 (('commonName', 'localhost'),)),
85 'subjectAltName': (('DNS', 'localhost'),),
86 'version': 3
87}
Antoine Pitrou152efa22010-05-16 18:19:27 +000088
Christian Heimes22587792013-11-21 23:56:13 +010089# empty CRL
90CRLFILE = data_file("revocation.crl")
91
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010092# Two keys and certs signed by the same CA (for SNI tests)
93SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020094SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010095
96SIGNED_CERTFILE_INFO = {
97 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
98 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
99 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
100 'issuer': ((('countryName', 'XY'),),
101 (('organizationName', 'Python Software Foundation CA'),),
102 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200103 'notAfter': 'Jul 7 14:23:16 2028 GMT',
104 'notBefore': 'Aug 29 14:23:16 2018 GMT',
105 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100106 'subject': ((('countryName', 'XY'),),
107 (('localityName', 'Castle Anthrax'),),
108 (('organizationName', 'Python Software Foundation'),),
109 (('commonName', 'localhost'),)),
110 'subjectAltName': (('DNS', 'localhost'),),
111 'version': 3
112}
113
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100114SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200115SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100116SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
117SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
118
Martin Panter3840b2a2016-03-27 01:53:46 +0000119# Same certificate as pycacert.pem, but without extra text in file
120SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200121# cert with all kinds of subject alt names
122ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100123IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100124
Martin Panter3d81d932016-01-14 09:36:00 +0000125REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000126
127EMPTYCERT = data_file("nullcert.pem")
128BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000129NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000130BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200131NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200132NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100133TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000134
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200135DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100136BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000137
Christian Heimes358cfd42016-09-10 22:43:48 +0200138# Not defined in all versions of OpenSSL
139OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
140OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
141OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
142OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100143OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200144
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100145
Thomas Woutersed03b412007-08-28 21:37:11 +0000146def handle_error(prefix):
147 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000148 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000149 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000150
Antoine Pitroub5218772010-05-21 09:56:06 +0000151def can_clear_options():
152 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200153 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000154
155def no_sslv2_implies_sslv3_hello():
156 # 0.9.7h or higher
157 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
158
Christian Heimes2427b502013-11-23 11:24:32 +0100159def have_verify_flags():
160 # 0.9.8 or higher
161 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
162
Christian Heimesb7b92252018-02-25 09:49:31 +0100163def _have_secp_curves():
164 if not ssl.HAS_ECDH:
165 return False
166 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
167 try:
168 ctx.set_ecdh_curve("secp384r1")
169 except ValueError:
170 return False
171 else:
172 return True
173
174
175HAVE_SECP_CURVES = _have_secp_curves()
176
177
Antoine Pitrouc695c952014-04-28 20:57:36 +0200178def utc_offset(): #NOTE: ignore issues like #1647654
179 # local time = utc time + utc offset
180 if time.daylight and time.localtime().tm_isdst > 0:
181 return -time.altzone # seconds
182 return -time.timezone
183
Christian Heimes9424bb42013-06-17 15:32:57 +0200184def asn1time(cert_time):
185 # Some versions of OpenSSL ignore seconds, see #18207
186 # 0.9.8.i
187 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
188 fmt = "%b %d %H:%M:%S %Y GMT"
189 dt = datetime.datetime.strptime(cert_time, fmt)
190 dt = dt.replace(second=0)
191 cert_time = dt.strftime(fmt)
192 # %d adds leading zero but ASN1_TIME_print() uses leading space
193 if cert_time[4] == "0":
194 cert_time = cert_time[:4] + " " + cert_time[5:]
195
196 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000197
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100198needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
199
Antoine Pitrou23df4832010-08-04 17:14:06 +0000200
Christian Heimesd0486372016-09-10 23:23:33 +0200201def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
202 cert_reqs=ssl.CERT_NONE, ca_certs=None,
203 ciphers=None, certfile=None, keyfile=None,
204 **kwargs):
205 context = ssl.SSLContext(ssl_version)
206 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200207 if cert_reqs == ssl.CERT_NONE:
208 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200209 context.verify_mode = cert_reqs
210 if ca_certs is not None:
211 context.load_verify_locations(ca_certs)
212 if certfile is not None or keyfile is not None:
213 context.load_cert_chain(certfile, keyfile)
214 if ciphers is not None:
215 context.set_ciphers(ciphers)
216 return context.wrap_socket(sock, **kwargs)
217
Christian Heimesa170fa12017-09-15 20:27:30 +0200218
219def testing_context(server_cert=SIGNED_CERTFILE):
220 """Create context
221
222 client_context, server_context, hostname = testing_context()
223 """
224 if server_cert == SIGNED_CERTFILE:
225 hostname = SIGNED_CERTFILE_HOSTNAME
226 elif server_cert == SIGNED_CERTFILE2:
227 hostname = SIGNED_CERTFILE2_HOSTNAME
228 else:
229 raise ValueError(server_cert)
230
231 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
232 client_context.load_verify_locations(SIGNING_CA)
233
234 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
235 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200236 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200237
238 return client_context, server_context, hostname
239
240
Antoine Pitrou152efa22010-05-16 18:19:27 +0000241class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000242
Antoine Pitrou480a1242010-04-28 21:37:09 +0000243 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000244 ssl.CERT_NONE
245 ssl.CERT_OPTIONAL
246 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100247 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100248 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100249 if ssl.HAS_ECDH:
250 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100251 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
252 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000253 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100254 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700255 ssl.OP_NO_SSLv2
256 ssl.OP_NO_SSLv3
257 ssl.OP_NO_TLSv1
258 ssl.OP_NO_TLSv1_3
259 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
260 ssl.OP_NO_TLSv1_1
261 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200262 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000263
Christian Heimes9d50ab52018-02-27 10:17:30 +0100264 def test_private_init(self):
265 with self.assertRaisesRegex(TypeError, "public constructor"):
266 with socket.socket() as s:
267 ssl.SSLSocket(s)
268
Antoine Pitrou172f0252014-04-18 20:33:08 +0200269 def test_str_for_enums(self):
270 # Make sure that the PROTOCOL_* constants have enum-like string
271 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200272 proto = ssl.PROTOCOL_TLS
273 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200274 ctx = ssl.SSLContext(proto)
275 self.assertIs(ctx.protocol, proto)
276
Antoine Pitrou480a1242010-04-28 21:37:09 +0000277 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000279 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 sys.stdout.write("\n RAND_status is %d (%s)\n"
281 % (v, (v and "sufficient randomness") or
282 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200283
284 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
285 self.assertEqual(len(data), 16)
286 self.assertEqual(is_cryptographic, v == 1)
287 if v:
288 data = ssl.RAND_bytes(16)
289 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200290 else:
291 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200292
Victor Stinner1e81a392013-12-19 16:47:04 +0100293 # negative num is invalid
294 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
295 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
296
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100297 if hasattr(ssl, 'RAND_egd'):
298 self.assertRaises(TypeError, ssl.RAND_egd, 1)
299 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000300 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200301 ssl.RAND_add(b"this is a random bytes object", 75.0)
302 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000303
Christian Heimesf77b4b22013-08-21 13:26:05 +0200304 @unittest.skipUnless(os.name == 'posix', 'requires posix')
305 def test_random_fork(self):
306 status = ssl.RAND_status()
307 if not status:
308 self.fail("OpenSSL's PRNG has insufficient randomness")
309
310 rfd, wfd = os.pipe()
311 pid = os.fork()
312 if pid == 0:
313 try:
314 os.close(rfd)
315 child_random = ssl.RAND_pseudo_bytes(16)[0]
316 self.assertEqual(len(child_random), 16)
317 os.write(wfd, child_random)
318 os.close(wfd)
319 except BaseException:
320 os._exit(1)
321 else:
322 os._exit(0)
323 else:
324 os.close(wfd)
325 self.addCleanup(os.close, rfd)
326 _, status = os.waitpid(pid, 0)
327 self.assertEqual(status, 0)
328
329 child_random = os.read(rfd, 16)
330 self.assertEqual(len(child_random), 16)
331 parent_random = ssl.RAND_pseudo_bytes(16)[0]
332 self.assertEqual(len(parent_random), 16)
333
334 self.assertNotEqual(child_random, parent_random)
335
Christian Heimese6dac002018-08-30 07:25:49 +0200336 maxDiff = None
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimesa37f5242019-01-15 23:47:42 +0100367 def test_parse_cert_CVE_2019_5010(self):
368 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 self.assertEqual(
372 p,
373 {
374 'issuer': (
375 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
376 'notAfter': 'Jun 14 18:00:58 2028 GMT',
377 'notBefore': 'Jun 18 18:00:58 2018 GMT',
378 'serialNumber': '02',
379 'subject': ((('countryName', 'UK'),),
380 (('commonName',
381 'codenomicon-vm-2.test.lal.cisco.com'),)),
382 'subjectAltName': (
383 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
384 'version': 3
385 }
386 )
387
Christian Heimes824f7f32013-08-17 00:54:47 +0200388 def test_parse_cert_CVE_2013_4238(self):
389 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
390 if support.verbose:
391 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
392 subject = ((('countryName', 'US'),),
393 (('stateOrProvinceName', 'Oregon'),),
394 (('localityName', 'Beaverton'),),
395 (('organizationName', 'Python Software Foundation'),),
396 (('organizationalUnitName', 'Python Core Development'),),
397 (('commonName', 'null.python.org\x00example.org'),),
398 (('emailAddress', 'python-dev@python.org'),))
399 self.assertEqual(p['subject'], subject)
400 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200401 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
402 san = (('DNS', 'altnull.python.org\x00example.com'),
403 ('email', 'null@python.org\x00user@example.org'),
404 ('URI', 'http://null.python.org\x00http://example.org'),
405 ('IP Address', '192.0.2.1'),
406 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
407 else:
408 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
409 san = (('DNS', 'altnull.python.org\x00example.com'),
410 ('email', 'null@python.org\x00user@example.org'),
411 ('URI', 'http://null.python.org\x00http://example.org'),
412 ('IP Address', '192.0.2.1'),
413 ('IP Address', '<invalid>'))
414
415 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200416
Christian Heimes1c03abd2016-09-06 23:25:35 +0200417 def test_parse_all_sans(self):
418 p = ssl._ssl._test_decode_cert(ALLSANFILE)
419 self.assertEqual(p['subjectAltName'],
420 (
421 ('DNS', 'allsans'),
422 ('othername', '<unsupported>'),
423 ('othername', '<unsupported>'),
424 ('email', 'user@example.org'),
425 ('DNS', 'www.example.org'),
426 ('DirName',
427 ((('countryName', 'XY'),),
428 (('localityName', 'Castle Anthrax'),),
429 (('organizationName', 'Python Software Foundation'),),
430 (('commonName', 'dirname example'),))),
431 ('URI', 'https://www.python.org/'),
432 ('IP Address', '127.0.0.1'),
433 ('IP Address', '0:0:0:0:0:0:0:1\n'),
434 ('Registered ID', '1.2.3.4.5')
435 )
436 )
437
Antoine Pitrou480a1242010-04-28 21:37:09 +0000438 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000439 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000440 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000441 d1 = ssl.PEM_cert_to_DER_cert(pem)
442 p2 = ssl.DER_cert_to_PEM_cert(d1)
443 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000444 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000445 if not p2.startswith(ssl.PEM_HEADER + '\n'):
446 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
447 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
448 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000449
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 def test_openssl_version(self):
451 n = ssl.OPENSSL_VERSION_NUMBER
452 t = ssl.OPENSSL_VERSION_INFO
453 s = ssl.OPENSSL_VERSION
454 self.assertIsInstance(n, int)
455 self.assertIsInstance(t, tuple)
456 self.assertIsInstance(s, str)
457 # Some sanity checks follow
458 # >= 0.9
459 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400460 # < 3.0
461 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000462 major, minor, fix, patch, status = t
463 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400464 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000465 self.assertGreaterEqual(minor, 0)
466 self.assertLess(minor, 256)
467 self.assertGreaterEqual(fix, 0)
468 self.assertLess(fix, 256)
469 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100470 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000471 self.assertGreaterEqual(status, 0)
472 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400473 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200474 if IS_LIBRESSL:
475 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100476 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400477 else:
478 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100479 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000480
Antoine Pitrou9d543662010-04-23 23:10:32 +0000481 @support.cpython_only
482 def test_refcycle(self):
483 # Issue #7943: an SSL object doesn't create reference cycles with
484 # itself.
485 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200486 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000487 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100488 with support.check_warnings(("", ResourceWarning)):
489 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100490 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000491
Antoine Pitroua468adc2010-09-14 14:43:44 +0000492 def test_wrapped_unconnected(self):
493 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200494 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000495 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200496 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100497 self.assertRaises(OSError, ss.recv, 1)
498 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
499 self.assertRaises(OSError, ss.recvfrom, 1)
500 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
501 self.assertRaises(OSError, ss.send, b'x')
502 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200503 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100504 self.assertRaises(NotImplementedError, ss.sendmsg,
505 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200506 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
507 self.assertRaises(NotImplementedError, ss.recvmsg_into,
508 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000509
Antoine Pitrou40f08742010-04-24 22:04:40 +0000510 def test_timeout(self):
511 # Issue #8524: when creating an SSL socket, the timeout of the
512 # original socket should be retained.
513 for timeout in (None, 0.0, 5.0):
514 s = socket.socket(socket.AF_INET)
515 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200516 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100517 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000518
Christian Heimesd0486372016-09-10 23:23:33 +0200519 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000520 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000521 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000522 "certfile must be specified",
523 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000524 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000525 "certfile must be specified for server-side operations",
526 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000527 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000528 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200529 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100530 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
531 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200532 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200533 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000534 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000535 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000536 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200537 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000538 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000539 ssl.wrap_socket(sock,
540 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000541 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200542 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000543 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000544 ssl.wrap_socket(sock,
545 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000546 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000547
Martin Panter3464ea22016-02-01 21:58:11 +0000548 def bad_cert_test(self, certfile):
549 """Check that trying to use the given client certificate fails"""
550 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
551 certfile)
552 sock = socket.socket()
553 self.addCleanup(sock.close)
554 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200555 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200556 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000557
558 def test_empty_cert(self):
559 """Wrapping with an empty cert file"""
560 self.bad_cert_test("nullcert.pem")
561
562 def test_malformed_cert(self):
563 """Wrapping with a badly formatted certificate (syntax error)"""
564 self.bad_cert_test("badcert.pem")
565
566 def test_malformed_key(self):
567 """Wrapping with a badly formatted key (syntax error)"""
568 self.bad_cert_test("badkey.pem")
569
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000570 def test_match_hostname(self):
571 def ok(cert, hostname):
572 ssl.match_hostname(cert, hostname)
573 def fail(cert, hostname):
574 self.assertRaises(ssl.CertificateError,
575 ssl.match_hostname, cert, hostname)
576
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100577 # -- Hostname matching --
578
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000579 cert = {'subject': ((('commonName', 'example.com'),),)}
580 ok(cert, 'example.com')
581 ok(cert, 'ExAmple.cOm')
582 fail(cert, 'www.example.com')
583 fail(cert, '.example.com')
584 fail(cert, 'example.org')
585 fail(cert, 'exampleXcom')
586
587 cert = {'subject': ((('commonName', '*.a.com'),),)}
588 ok(cert, 'foo.a.com')
589 fail(cert, 'bar.foo.a.com')
590 fail(cert, 'a.com')
591 fail(cert, 'Xa.com')
592 fail(cert, '.a.com')
593
Mandeep Singhede2ac92017-11-27 04:01:27 +0530594 # only match wildcards when they are the only thing
595 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000596 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530597 fail(cert, 'foo.com')
598 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 fail(cert, 'bar.com')
600 fail(cert, 'foo.a.com')
601 fail(cert, 'bar.foo.com')
602
Christian Heimes824f7f32013-08-17 00:54:47 +0200603 # NULL bytes are bad, CVE-2013-4073
604 cert = {'subject': ((('commonName',
605 'null.python.org\x00example.org'),),)}
606 ok(cert, 'null.python.org\x00example.org') # or raise an error?
607 fail(cert, 'example.org')
608 fail(cert, 'null.python.org')
609
Georg Brandl72c98d32013-10-27 07:16:53 +0100610 # error cases with wildcards
611 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
612 fail(cert, 'bar.foo.a.com')
613 fail(cert, 'a.com')
614 fail(cert, 'Xa.com')
615 fail(cert, '.a.com')
616
617 cert = {'subject': ((('commonName', 'a.*.com'),),)}
618 fail(cert, 'a.foo.com')
619 fail(cert, 'a..com')
620 fail(cert, 'a.com')
621
622 # wildcard doesn't match IDNA prefix 'xn--'
623 idna = 'püthon.python.org'.encode("idna").decode("ascii")
624 cert = {'subject': ((('commonName', idna),),)}
625 ok(cert, idna)
626 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
627 fail(cert, idna)
628 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
629 fail(cert, idna)
630
631 # wildcard in first fragment and IDNA A-labels in sequent fragments
632 # are supported.
633 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
634 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530635 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
636 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100637 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
638 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
639
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000640 # Slightly fake real-world example
641 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
642 'subject': ((('commonName', 'linuxfrz.org'),),),
643 'subjectAltName': (('DNS', 'linuxfr.org'),
644 ('DNS', 'linuxfr.com'),
645 ('othername', '<unsupported>'))}
646 ok(cert, 'linuxfr.org')
647 ok(cert, 'linuxfr.com')
648 # Not a "DNS" entry
649 fail(cert, '<unsupported>')
650 # When there is a subjectAltName, commonName isn't used
651 fail(cert, 'linuxfrz.org')
652
653 # A pristine real-world example
654 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
655 'subject': ((('countryName', 'US'),),
656 (('stateOrProvinceName', 'California'),),
657 (('localityName', 'Mountain View'),),
658 (('organizationName', 'Google Inc'),),
659 (('commonName', 'mail.google.com'),))}
660 ok(cert, 'mail.google.com')
661 fail(cert, 'gmail.com')
662 # Only commonName is considered
663 fail(cert, 'California')
664
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100665 # -- IPv4 matching --
666 cert = {'subject': ((('commonName', 'example.com'),),),
667 'subjectAltName': (('DNS', 'example.com'),
668 ('IP Address', '10.11.12.13'),
669 ('IP Address', '14.15.16.17'))}
670 ok(cert, '10.11.12.13')
671 ok(cert, '14.15.16.17')
672 fail(cert, '14.15.16.18')
673 fail(cert, 'example.net')
674
675 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100676 if hasattr(socket, 'AF_INET6'):
677 cert = {'subject': ((('commonName', 'example.com'),),),
678 'subjectAltName': (
679 ('DNS', 'example.com'),
680 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
681 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
682 ok(cert, '2001::cafe')
683 ok(cert, '2003::baba')
684 fail(cert, '2003::bebe')
685 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100686
687 # -- Miscellaneous --
688
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000689 # Neither commonName nor subjectAltName
690 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
691 'subject': ((('countryName', 'US'),),
692 (('stateOrProvinceName', 'California'),),
693 (('localityName', 'Mountain View'),),
694 (('organizationName', 'Google Inc'),))}
695 fail(cert, 'mail.google.com')
696
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200697 # No DNS entry in subjectAltName but a commonName
698 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
699 'subject': ((('countryName', 'US'),),
700 (('stateOrProvinceName', 'California'),),
701 (('localityName', 'Mountain View'),),
702 (('commonName', 'mail.google.com'),)),
703 'subjectAltName': (('othername', 'blabla'), )}
704 ok(cert, 'mail.google.com')
705
706 # No DNS entry subjectAltName and no commonName
707 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
708 'subject': ((('countryName', 'US'),),
709 (('stateOrProvinceName', 'California'),),
710 (('localityName', 'Mountain View'),),
711 (('organizationName', 'Google Inc'),)),
712 'subjectAltName': (('othername', 'blabla'),)}
713 fail(cert, 'google.com')
714
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000715 # Empty cert / no cert
716 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
717 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
718
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200719 # Issue #17980: avoid denials of service by refusing more than one
720 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100721 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
722 with self.assertRaisesRegex(
723 ssl.CertificateError,
724 "partial wildcards in leftmost label are not supported"):
725 ssl.match_hostname(cert, 'axxb.example.com')
726
727 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
728 with self.assertRaisesRegex(
729 ssl.CertificateError,
730 "wildcard can only be present in the leftmost label"):
731 ssl.match_hostname(cert, 'www.sub.example.com')
732
733 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
734 with self.assertRaisesRegex(
735 ssl.CertificateError,
736 "too many wildcards"):
737 ssl.match_hostname(cert, 'axxbxxc.example.com')
738
739 cert = {'subject': ((('commonName', '*'),),)}
740 with self.assertRaisesRegex(
741 ssl.CertificateError,
742 "sole wildcard without additional labels are not support"):
743 ssl.match_hostname(cert, 'host')
744
745 cert = {'subject': ((('commonName', '*.com'),),)}
746 with self.assertRaisesRegex(
747 ssl.CertificateError,
748 r"hostname 'com' doesn't match '\*.com'"):
749 ssl.match_hostname(cert, 'com')
750
751 # extra checks for _inet_paton()
752 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
753 with self.assertRaises(ValueError):
754 ssl._inet_paton(invalid)
755 for ipaddr in ['127.0.0.1', '192.168.0.1']:
756 self.assertTrue(ssl._inet_paton(ipaddr))
757 if hasattr(socket, 'AF_INET6'):
758 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
759 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200760
Antoine Pitroud5323212010-10-22 18:19:07 +0000761 def test_server_side(self):
762 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200763 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000764 with socket.socket() as sock:
765 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
766 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000767
Antoine Pitroud6494802011-07-21 01:11:30 +0200768 def test_unknown_channel_binding(self):
769 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200770 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200771 c = socket.socket(socket.AF_INET)
772 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200773 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100774 with self.assertRaises(ValueError):
775 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200776 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200777
778 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
779 "'tls-unique' channel binding not available")
780 def test_tls_unique_channel_binding(self):
781 # unconnected should return None for known type
782 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200783 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100784 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200785 # the same for server-side
786 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200787 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100788 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200789
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600790 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200791 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600792 r = repr(ss)
793 with self.assertWarns(ResourceWarning) as cm:
794 ss = None
795 support.gc_collect()
796 self.assertIn(r, str(cm.warning.args[0]))
797
Christian Heimes6d7ad132013-06-09 18:02:55 +0200798 def test_get_default_verify_paths(self):
799 paths = ssl.get_default_verify_paths()
800 self.assertEqual(len(paths), 6)
801 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
802
803 with support.EnvironmentVarGuard() as env:
804 env["SSL_CERT_DIR"] = CAPATH
805 env["SSL_CERT_FILE"] = CERTFILE
806 paths = ssl.get_default_verify_paths()
807 self.assertEqual(paths.cafile, CERTFILE)
808 self.assertEqual(paths.capath, CAPATH)
809
Christian Heimes44109d72013-11-22 01:51:30 +0100810 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
811 def test_enum_certificates(self):
812 self.assertTrue(ssl.enum_certificates("CA"))
813 self.assertTrue(ssl.enum_certificates("ROOT"))
814
815 self.assertRaises(TypeError, ssl.enum_certificates)
816 self.assertRaises(WindowsError, ssl.enum_certificates, "")
817
Christian Heimesc2d65e12013-11-22 16:13:55 +0100818 trust_oids = set()
819 for storename in ("CA", "ROOT"):
820 store = ssl.enum_certificates(storename)
821 self.assertIsInstance(store, list)
822 for element in store:
823 self.assertIsInstance(element, tuple)
824 self.assertEqual(len(element), 3)
825 cert, enc, trust = element
826 self.assertIsInstance(cert, bytes)
827 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
828 self.assertIsInstance(trust, (set, bool))
829 if isinstance(trust, set):
830 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100831
832 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100833 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200834
Christian Heimes46bebee2013-06-09 19:03:31 +0200835 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100836 def test_enum_crls(self):
837 self.assertTrue(ssl.enum_crls("CA"))
838 self.assertRaises(TypeError, ssl.enum_crls)
839 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200840
Christian Heimes44109d72013-11-22 01:51:30 +0100841 crls = ssl.enum_crls("CA")
842 self.assertIsInstance(crls, list)
843 for element in crls:
844 self.assertIsInstance(element, tuple)
845 self.assertEqual(len(element), 2)
846 self.assertIsInstance(element[0], bytes)
847 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200848
Christian Heimes46bebee2013-06-09 19:03:31 +0200849
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100850 def test_asn1object(self):
851 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
852 '1.3.6.1.5.5.7.3.1')
853
854 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
855 self.assertEqual(val, expected)
856 self.assertEqual(val.nid, 129)
857 self.assertEqual(val.shortname, 'serverAuth')
858 self.assertEqual(val.longname, 'TLS Web Server Authentication')
859 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
860 self.assertIsInstance(val, ssl._ASN1Object)
861 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
862
863 val = ssl._ASN1Object.fromnid(129)
864 self.assertEqual(val, expected)
865 self.assertIsInstance(val, ssl._ASN1Object)
866 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100867 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
868 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100869 for i in range(1000):
870 try:
871 obj = ssl._ASN1Object.fromnid(i)
872 except ValueError:
873 pass
874 else:
875 self.assertIsInstance(obj.nid, int)
876 self.assertIsInstance(obj.shortname, str)
877 self.assertIsInstance(obj.longname, str)
878 self.assertIsInstance(obj.oid, (str, type(None)))
879
880 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
881 self.assertEqual(val, expected)
882 self.assertIsInstance(val, ssl._ASN1Object)
883 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
884 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
885 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100886 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
887 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100888
Christian Heimes72d28502013-11-23 13:56:58 +0100889 def test_purpose_enum(self):
890 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
891 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
892 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
893 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
894 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
895 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
896 '1.3.6.1.5.5.7.3.1')
897
898 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
899 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
900 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
901 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
902 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
903 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
904 '1.3.6.1.5.5.7.3.2')
905
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100906 def test_unsupported_dtls(self):
907 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
908 self.addCleanup(s.close)
909 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200910 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100911 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200912 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100913 with self.assertRaises(NotImplementedError) as cx:
914 ctx.wrap_socket(s)
915 self.assertEqual(str(cx.exception), "only stream sockets are supported")
916
Antoine Pitrouc695c952014-04-28 20:57:36 +0200917 def cert_time_ok(self, timestring, timestamp):
918 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
919
920 def cert_time_fail(self, timestring):
921 with self.assertRaises(ValueError):
922 ssl.cert_time_to_seconds(timestring)
923
924 @unittest.skipUnless(utc_offset(),
925 'local time needs to be different from UTC')
926 def test_cert_time_to_seconds_timezone(self):
927 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
928 # results if local timezone is not UTC
929 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
930 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
931
932 def test_cert_time_to_seconds(self):
933 timestring = "Jan 5 09:34:43 2018 GMT"
934 ts = 1515144883.0
935 self.cert_time_ok(timestring, ts)
936 # accept keyword parameter, assert its name
937 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
938 # accept both %e and %d (space or zero generated by strftime)
939 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
940 # case-insensitive
941 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
942 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
943 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
944 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
945 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
946 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
947 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
948 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
949
950 newyear_ts = 1230768000.0
951 # leap seconds
952 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
953 # same timestamp
954 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
955
956 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
957 # allow 60th second (even if it is not a leap second)
958 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
959 # allow 2nd leap second for compatibility with time.strptime()
960 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
961 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
962
Mike53f7a7c2017-12-14 14:04:53 +0300963 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200964 # 99991231235959Z (rfc 5280)
965 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
966
967 @support.run_with_locale('LC_ALL', '')
968 def test_cert_time_to_seconds_locale(self):
969 # `cert_time_to_seconds()` should be locale independent
970
971 def local_february_name():
972 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
973
974 if local_february_name().lower() == 'feb':
975 self.skipTest("locale-specific month name needs to be "
976 "different from C locale")
977
978 # locale-independent
979 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
980 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
981
Martin Panter3840b2a2016-03-27 01:53:46 +0000982 def test_connect_ex_error(self):
983 server = socket.socket(socket.AF_INET)
984 self.addCleanup(server.close)
985 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200986 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000987 cert_reqs=ssl.CERT_REQUIRED)
988 self.addCleanup(s.close)
989 rc = s.connect_ex((HOST, port))
990 # Issue #19919: Windows machines or VMs hosted on Windows
991 # machines sometimes return EWOULDBLOCK.
992 errors = (
993 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
994 errno.EWOULDBLOCK,
995 )
996 self.assertIn(rc, errors)
997
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100998
Antoine Pitrou152efa22010-05-16 18:19:27 +0000999class ContextTests(unittest.TestCase):
1000
1001 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001002 for protocol in PROTOCOLS:
1003 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001004 ctx = ssl.SSLContext()
1005 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001006 self.assertRaises(ValueError, ssl.SSLContext, -1)
1007 self.assertRaises(ValueError, ssl.SSLContext, 42)
1008
1009 def test_protocol(self):
1010 for proto in PROTOCOLS:
1011 ctx = ssl.SSLContext(proto)
1012 self.assertEqual(ctx.protocol, proto)
1013
1014 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001016 ctx.set_ciphers("ALL")
1017 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001018 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001019 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001020
Christian Heimes892d66e2018-01-29 14:10:18 +01001021 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1022 "Test applies only to Python default ciphers")
1023 def test_python_ciphers(self):
1024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1025 ciphers = ctx.get_ciphers()
1026 for suite in ciphers:
1027 name = suite['name']
1028 self.assertNotIn("PSK", name)
1029 self.assertNotIn("SRP", name)
1030 self.assertNotIn("MD5", name)
1031 self.assertNotIn("RC4", name)
1032 self.assertNotIn("3DES", name)
1033
Christian Heimes25bfcd52016-09-06 00:04:45 +02001034 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1035 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001036 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001037 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001038 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001039 self.assertIn('AES256-GCM-SHA384', names)
1040 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001041
Antoine Pitroub5218772010-05-21 09:56:06 +00001042 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001044 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001045 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001046 # SSLContext also enables these by default
1047 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001048 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1049 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001050 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001051 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001052 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001053 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001054 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1055 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001056 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001057 # Ubuntu has OP_NO_SSLv3 forced on by default
1058 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001059 else:
1060 with self.assertRaises(ValueError):
1061 ctx.options = 0
1062
Christian Heimesa170fa12017-09-15 20:27:30 +02001063 def test_verify_mode_protocol(self):
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001065 # Default value
1066 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1067 ctx.verify_mode = ssl.CERT_OPTIONAL
1068 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1069 ctx.verify_mode = ssl.CERT_REQUIRED
1070 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1071 ctx.verify_mode = ssl.CERT_NONE
1072 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1073 with self.assertRaises(TypeError):
1074 ctx.verify_mode = None
1075 with self.assertRaises(ValueError):
1076 ctx.verify_mode = 42
1077
Christian Heimesa170fa12017-09-15 20:27:30 +02001078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1079 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1080 self.assertFalse(ctx.check_hostname)
1081
1082 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1083 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1084 self.assertTrue(ctx.check_hostname)
1085
Christian Heimes61d478c2018-01-27 15:51:38 +01001086 def test_hostname_checks_common_name(self):
1087 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1088 self.assertTrue(ctx.hostname_checks_common_name)
1089 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1090 ctx.hostname_checks_common_name = True
1091 self.assertTrue(ctx.hostname_checks_common_name)
1092 ctx.hostname_checks_common_name = False
1093 self.assertFalse(ctx.hostname_checks_common_name)
1094 ctx.hostname_checks_common_name = True
1095 self.assertTrue(ctx.hostname_checks_common_name)
1096 else:
1097 with self.assertRaises(AttributeError):
1098 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001099
Christian Heimes698dde12018-02-27 11:54:43 +01001100 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1101 "required OpenSSL 1.1.0g")
1102 def test_min_max_version(self):
1103 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001104 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1105 # Fedora override the setting to TLS 1.0.
1106 self.assertIn(
1107 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001108 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1109 # Fedora 29 uses TLS 1.0 by default
1110 ssl.TLSVersion.TLSv1,
1111 # RHEL 8 uses TLS 1.2 by default
1112 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001113 )
1114 self.assertEqual(
1115 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1116 )
1117
1118 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1119 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1120 self.assertEqual(
1121 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1122 )
1123 self.assertEqual(
1124 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1125 )
1126
1127 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1128 ctx.maximum_version = ssl.TLSVersion.TLSv1
1129 self.assertEqual(
1130 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1131 )
1132 self.assertEqual(
1133 ctx.maximum_version, ssl.TLSVersion.TLSv1
1134 )
1135
1136 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 self.assertEqual(
1138 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1139 )
1140
1141 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1142 self.assertIn(
1143 ctx.maximum_version,
1144 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1145 )
1146
1147 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1148 self.assertIn(
1149 ctx.minimum_version,
1150 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1151 )
1152
1153 with self.assertRaises(ValueError):
1154 ctx.minimum_version = 42
1155
1156 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1157
1158 self.assertEqual(
1159 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1160 )
1161 self.assertEqual(
1162 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1163 )
1164 with self.assertRaises(ValueError):
1165 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1166 with self.assertRaises(ValueError):
1167 ctx.maximum_version = ssl.TLSVersion.TLSv1
1168
1169
Christian Heimes2427b502013-11-23 11:24:32 +01001170 @unittest.skipUnless(have_verify_flags(),
1171 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001172 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001174 # default value
1175 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1176 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001177 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1178 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1179 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1180 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1181 ctx.verify_flags = ssl.VERIFY_DEFAULT
1182 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1183 # supports any value
1184 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1185 self.assertEqual(ctx.verify_flags,
1186 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1187 with self.assertRaises(TypeError):
1188 ctx.verify_flags = None
1189
Antoine Pitrou152efa22010-05-16 18:19:27 +00001190 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001192 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001193 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001194 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1195 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001196 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001197 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001198 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001199 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001200 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001201 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001202 ctx.load_cert_chain(EMPTYCERT)
1203 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001204 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001205 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1206 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1207 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001208 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001209 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001210 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001211 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001212 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001213 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1214 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001215 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001216 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001217 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001218 # Password protected key and cert
1219 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1220 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1221 ctx.load_cert_chain(CERTFILE_PROTECTED,
1222 password=bytearray(KEY_PASSWORD.encode()))
1223 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1224 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1225 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1226 bytearray(KEY_PASSWORD.encode()))
1227 with self.assertRaisesRegex(TypeError, "should be a string"):
1228 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1229 with self.assertRaises(ssl.SSLError):
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1231 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1232 # openssl has a fixed limit on the password buffer.
1233 # PEM_BUFSIZE is generally set to 1kb.
1234 # Return a string larger than this.
1235 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1236 # Password callback
1237 def getpass_unicode():
1238 return KEY_PASSWORD
1239 def getpass_bytes():
1240 return KEY_PASSWORD.encode()
1241 def getpass_bytearray():
1242 return bytearray(KEY_PASSWORD.encode())
1243 def getpass_badpass():
1244 return "badpass"
1245 def getpass_huge():
1246 return b'a' * (1024 * 1024)
1247 def getpass_bad_type():
1248 return 9
1249 def getpass_exception():
1250 raise Exception('getpass error')
1251 class GetPassCallable:
1252 def __call__(self):
1253 return KEY_PASSWORD
1254 def getpass(self):
1255 return KEY_PASSWORD
1256 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1257 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1258 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1259 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1260 ctx.load_cert_chain(CERTFILE_PROTECTED,
1261 password=GetPassCallable().getpass)
1262 with self.assertRaises(ssl.SSLError):
1263 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1264 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1265 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1266 with self.assertRaisesRegex(TypeError, "must return a string"):
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1268 with self.assertRaisesRegex(Exception, "getpass error"):
1269 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1270 # Make sure the password function isn't called if it isn't needed
1271 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001272
1273 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001275 ctx.load_verify_locations(CERTFILE)
1276 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1277 ctx.load_verify_locations(BYTES_CERTFILE)
1278 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1279 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001280 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001281 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001282 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001283 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001284 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001285 ctx.load_verify_locations(BADCERT)
1286 ctx.load_verify_locations(CERTFILE, CAPATH)
1287 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1288
Victor Stinner80f75e62011-01-29 11:31:20 +00001289 # Issue #10989: crash if the second argument type is invalid
1290 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1291
Christian Heimesefff7062013-11-21 03:35:02 +01001292 def test_load_verify_cadata(self):
1293 # test cadata
1294 with open(CAFILE_CACERT) as f:
1295 cacert_pem = f.read()
1296 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1297 with open(CAFILE_NEURONIO) as f:
1298 neuronio_pem = f.read()
1299 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1300
1301 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001302 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1304 ctx.load_verify_locations(cadata=cacert_pem)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1306 ctx.load_verify_locations(cadata=neuronio_pem)
1307 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1308 # cert already in hash table
1309 ctx.load_verify_locations(cadata=neuronio_pem)
1310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1311
1312 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001313 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001314 combined = "\n".join((cacert_pem, neuronio_pem))
1315 ctx.load_verify_locations(cadata=combined)
1316 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1317
1318 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001319 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001320 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1321 neuronio_pem, "tail"]
1322 ctx.load_verify_locations(cadata="\n".join(combined))
1323 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1324
1325 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001326 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001327 ctx.load_verify_locations(cadata=cacert_der)
1328 ctx.load_verify_locations(cadata=neuronio_der)
1329 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1330 # cert already in hash table
1331 ctx.load_verify_locations(cadata=cacert_der)
1332 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1333
1334 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001335 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001336 combined = b"".join((cacert_der, neuronio_der))
1337 ctx.load_verify_locations(cadata=combined)
1338 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1339
1340 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001341 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001342 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1343
1344 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1345 ctx.load_verify_locations(cadata="broken")
1346 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1347 ctx.load_verify_locations(cadata=b"broken")
1348
1349
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001350 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001351 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001352 ctx.load_dh_params(DHFILE)
1353 if os.name != 'nt':
1354 ctx.load_dh_params(BYTES_DHFILE)
1355 self.assertRaises(TypeError, ctx.load_dh_params)
1356 self.assertRaises(TypeError, ctx.load_dh_params, None)
1357 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001358 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001359 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001360 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001361 ctx.load_dh_params(CERTFILE)
1362
Antoine Pitroub0182c82010-10-12 20:09:02 +00001363 def test_session_stats(self):
1364 for proto in PROTOCOLS:
1365 ctx = ssl.SSLContext(proto)
1366 self.assertEqual(ctx.session_stats(), {
1367 'number': 0,
1368 'connect': 0,
1369 'connect_good': 0,
1370 'connect_renegotiate': 0,
1371 'accept': 0,
1372 'accept_good': 0,
1373 'accept_renegotiate': 0,
1374 'hits': 0,
1375 'misses': 0,
1376 'timeouts': 0,
1377 'cache_full': 0,
1378 })
1379
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001380 def test_set_default_verify_paths(self):
1381 # There's not much we can do to test that it acts as expected,
1382 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001383 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001384 ctx.set_default_verify_paths()
1385
Antoine Pitrou501da612011-12-21 09:27:41 +01001386 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001387 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001388 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001389 ctx.set_ecdh_curve("prime256v1")
1390 ctx.set_ecdh_curve(b"prime256v1")
1391 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1392 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1393 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1394 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1395
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001396 @needs_sni
1397 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001398 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001399
1400 # set_servername_callback expects a callable, or None
1401 self.assertRaises(TypeError, ctx.set_servername_callback)
1402 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1403 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1404 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1405
1406 def dummycallback(sock, servername, ctx):
1407 pass
1408 ctx.set_servername_callback(None)
1409 ctx.set_servername_callback(dummycallback)
1410
1411 @needs_sni
1412 def test_sni_callback_refcycle(self):
1413 # Reference cycles through the servername callback are detected
1414 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001415 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001416 def dummycallback(sock, servername, ctx, cycle=ctx):
1417 pass
1418 ctx.set_servername_callback(dummycallback)
1419 wr = weakref.ref(ctx)
1420 del ctx, dummycallback
1421 gc.collect()
1422 self.assertIs(wr(), None)
1423
Christian Heimes9a5395a2013-06-17 15:44:12 +02001424 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001426 self.assertEqual(ctx.cert_store_stats(),
1427 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1428 ctx.load_cert_chain(CERTFILE)
1429 self.assertEqual(ctx.cert_store_stats(),
1430 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1431 ctx.load_verify_locations(CERTFILE)
1432 self.assertEqual(ctx.cert_store_stats(),
1433 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001434 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001435 self.assertEqual(ctx.cert_store_stats(),
1436 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1437
1438 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001439 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001440 self.assertEqual(ctx.get_ca_certs(), [])
1441 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1442 ctx.load_verify_locations(CERTFILE)
1443 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001444 # but CAFILE_CACERT is a CA cert
1445 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001446 self.assertEqual(ctx.get_ca_certs(),
1447 [{'issuer': ((('organizationName', 'Root CA'),),
1448 (('organizationalUnitName', 'http://www.cacert.org'),),
1449 (('commonName', 'CA Cert Signing Authority'),),
1450 (('emailAddress', 'support@cacert.org'),)),
1451 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1452 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1453 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001454 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001455 'subject': ((('organizationName', 'Root CA'),),
1456 (('organizationalUnitName', 'http://www.cacert.org'),),
1457 (('commonName', 'CA Cert Signing Authority'),),
1458 (('emailAddress', 'support@cacert.org'),)),
1459 'version': 3}])
1460
Martin Panterb55f8b72016-01-14 12:53:56 +00001461 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001462 pem = f.read()
1463 der = ssl.PEM_cert_to_DER_cert(pem)
1464 self.assertEqual(ctx.get_ca_certs(True), [der])
1465
Christian Heimes72d28502013-11-23 13:56:58 +01001466 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001467 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001468 ctx.load_default_certs()
1469
Christian Heimesa170fa12017-09-15 20:27:30 +02001470 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001471 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1472 ctx.load_default_certs()
1473
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001475 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1476
Christian Heimesa170fa12017-09-15 20:27:30 +02001477 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001478 self.assertRaises(TypeError, ctx.load_default_certs, None)
1479 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1480
Benjamin Peterson91244e02014-10-03 18:17:15 -04001481 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001482 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001483 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001484 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001485 with support.EnvironmentVarGuard() as env:
1486 env["SSL_CERT_DIR"] = CAPATH
1487 env["SSL_CERT_FILE"] = CERTFILE
1488 ctx.load_default_certs()
1489 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1490
Benjamin Peterson91244e02014-10-03 18:17:15 -04001491 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001492 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001493 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001494 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001495 ctx.load_default_certs()
1496 stats = ctx.cert_store_stats()
1497
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001499 with support.EnvironmentVarGuard() as env:
1500 env["SSL_CERT_DIR"] = CAPATH
1501 env["SSL_CERT_FILE"] = CERTFILE
1502 ctx.load_default_certs()
1503 stats["x509"] += 1
1504 self.assertEqual(ctx.cert_store_stats(), stats)
1505
Christian Heimes358cfd42016-09-10 22:43:48 +02001506 def _assert_context_options(self, ctx):
1507 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1508 if OP_NO_COMPRESSION != 0:
1509 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1510 OP_NO_COMPRESSION)
1511 if OP_SINGLE_DH_USE != 0:
1512 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1513 OP_SINGLE_DH_USE)
1514 if OP_SINGLE_ECDH_USE != 0:
1515 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1516 OP_SINGLE_ECDH_USE)
1517 if OP_CIPHER_SERVER_PREFERENCE != 0:
1518 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1519 OP_CIPHER_SERVER_PREFERENCE)
1520
Christian Heimes4c05b472013-11-23 15:58:30 +01001521 def test_create_default_context(self):
1522 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001523
Christian Heimesa170fa12017-09-15 20:27:30 +02001524 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001525 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001526 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001527 self._assert_context_options(ctx)
1528
Christian Heimes4c05b472013-11-23 15:58:30 +01001529 with open(SIGNING_CA) as f:
1530 cadata = f.read()
1531 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1532 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001533 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001534 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001535 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001536
1537 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001539 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001540 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001541
Christian Heimes67986f92013-11-23 22:43:47 +01001542 def test__create_stdlib_context(self):
1543 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001544 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001545 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001546 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001547 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001548
1549 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1550 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1551 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001552 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001553
1554 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001555 cert_reqs=ssl.CERT_REQUIRED,
1556 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001557 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1558 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001559 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001560 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001561
1562 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001563 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001564 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001565 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001566
Christian Heimes1aa9a752013-12-02 02:41:19 +01001567 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001568 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001569 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001570 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001571
Christian Heimese82c0342017-09-15 20:29:57 +02001572 # Auto set CERT_REQUIRED
1573 ctx.check_hostname = True
1574 self.assertTrue(ctx.check_hostname)
1575 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1576 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001577 ctx.verify_mode = ssl.CERT_REQUIRED
1578 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001579 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001580
Christian Heimese82c0342017-09-15 20:29:57 +02001581 # Changing verify_mode does not affect check_hostname
1582 ctx.check_hostname = False
1583 ctx.verify_mode = ssl.CERT_NONE
1584 ctx.check_hostname = False
1585 self.assertFalse(ctx.check_hostname)
1586 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1587 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001588 ctx.check_hostname = True
1589 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001590 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1591
1592 ctx.check_hostname = False
1593 ctx.verify_mode = ssl.CERT_OPTIONAL
1594 ctx.check_hostname = False
1595 self.assertFalse(ctx.check_hostname)
1596 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1597 # keep CERT_OPTIONAL
1598 ctx.check_hostname = True
1599 self.assertTrue(ctx.check_hostname)
1600 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001601
1602 # Cannot set CERT_NONE with check_hostname enabled
1603 with self.assertRaises(ValueError):
1604 ctx.verify_mode = ssl.CERT_NONE
1605 ctx.check_hostname = False
1606 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001607 ctx.verify_mode = ssl.CERT_NONE
1608 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001609
Christian Heimes5fe668c2016-09-12 00:01:11 +02001610 def test_context_client_server(self):
1611 # PROTOCOL_TLS_CLIENT has sane defaults
1612 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1613 self.assertTrue(ctx.check_hostname)
1614 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1615
1616 # PROTOCOL_TLS_SERVER has different but also sane defaults
1617 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1618 self.assertFalse(ctx.check_hostname)
1619 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1620
Christian Heimes4df60f12017-09-15 20:26:05 +02001621 def test_context_custom_class(self):
1622 class MySSLSocket(ssl.SSLSocket):
1623 pass
1624
1625 class MySSLObject(ssl.SSLObject):
1626 pass
1627
1628 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1629 ctx.sslsocket_class = MySSLSocket
1630 ctx.sslobject_class = MySSLObject
1631
1632 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1633 self.assertIsInstance(sock, MySSLSocket)
1634 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1635 self.assertIsInstance(obj, MySSLObject)
1636
Antoine Pitrou152efa22010-05-16 18:19:27 +00001637
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001638class SSLErrorTests(unittest.TestCase):
1639
1640 def test_str(self):
1641 # The str() of a SSLError doesn't include the errno
1642 e = ssl.SSLError(1, "foo")
1643 self.assertEqual(str(e), "foo")
1644 self.assertEqual(e.errno, 1)
1645 # Same for a subclass
1646 e = ssl.SSLZeroReturnError(1, "foo")
1647 self.assertEqual(str(e), "foo")
1648 self.assertEqual(e.errno, 1)
1649
1650 def test_lib_reason(self):
1651 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001652 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001653 with self.assertRaises(ssl.SSLError) as cm:
1654 ctx.load_dh_params(CERTFILE)
1655 self.assertEqual(cm.exception.library, 'PEM')
1656 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1657 s = str(cm.exception)
1658 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1659
1660 def test_subclass(self):
1661 # Check that the appropriate SSLError subclass is raised
1662 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001663 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1664 ctx.check_hostname = False
1665 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001666 with socket.create_server(("127.0.0.1", 0)) as s:
1667 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001668 c.setblocking(False)
1669 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001670 with self.assertRaises(ssl.SSLWantReadError) as cm:
1671 c.do_handshake()
1672 s = str(cm.exception)
1673 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1674 # For compatibility
1675 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1676
1677
Christian Heimes61d478c2018-01-27 15:51:38 +01001678 def test_bad_server_hostname(self):
1679 ctx = ssl.create_default_context()
1680 with self.assertRaises(ValueError):
1681 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1682 server_hostname="")
1683 with self.assertRaises(ValueError):
1684 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1685 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001686 with self.assertRaises(TypeError):
1687 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1688 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001689
1690
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001691class MemoryBIOTests(unittest.TestCase):
1692
1693 def test_read_write(self):
1694 bio = ssl.MemoryBIO()
1695 bio.write(b'foo')
1696 self.assertEqual(bio.read(), b'foo')
1697 self.assertEqual(bio.read(), b'')
1698 bio.write(b'foo')
1699 bio.write(b'bar')
1700 self.assertEqual(bio.read(), b'foobar')
1701 self.assertEqual(bio.read(), b'')
1702 bio.write(b'baz')
1703 self.assertEqual(bio.read(2), b'ba')
1704 self.assertEqual(bio.read(1), b'z')
1705 self.assertEqual(bio.read(1), b'')
1706
1707 def test_eof(self):
1708 bio = ssl.MemoryBIO()
1709 self.assertFalse(bio.eof)
1710 self.assertEqual(bio.read(), b'')
1711 self.assertFalse(bio.eof)
1712 bio.write(b'foo')
1713 self.assertFalse(bio.eof)
1714 bio.write_eof()
1715 self.assertFalse(bio.eof)
1716 self.assertEqual(bio.read(2), b'fo')
1717 self.assertFalse(bio.eof)
1718 self.assertEqual(bio.read(1), b'o')
1719 self.assertTrue(bio.eof)
1720 self.assertEqual(bio.read(), b'')
1721 self.assertTrue(bio.eof)
1722
1723 def test_pending(self):
1724 bio = ssl.MemoryBIO()
1725 self.assertEqual(bio.pending, 0)
1726 bio.write(b'foo')
1727 self.assertEqual(bio.pending, 3)
1728 for i in range(3):
1729 bio.read(1)
1730 self.assertEqual(bio.pending, 3-i-1)
1731 for i in range(3):
1732 bio.write(b'x')
1733 self.assertEqual(bio.pending, i+1)
1734 bio.read()
1735 self.assertEqual(bio.pending, 0)
1736
1737 def test_buffer_types(self):
1738 bio = ssl.MemoryBIO()
1739 bio.write(b'foo')
1740 self.assertEqual(bio.read(), b'foo')
1741 bio.write(bytearray(b'bar'))
1742 self.assertEqual(bio.read(), b'bar')
1743 bio.write(memoryview(b'baz'))
1744 self.assertEqual(bio.read(), b'baz')
1745
1746 def test_error_types(self):
1747 bio = ssl.MemoryBIO()
1748 self.assertRaises(TypeError, bio.write, 'foo')
1749 self.assertRaises(TypeError, bio.write, None)
1750 self.assertRaises(TypeError, bio.write, True)
1751 self.assertRaises(TypeError, bio.write, 1)
1752
1753
Christian Heimes9d50ab52018-02-27 10:17:30 +01001754class SSLObjectTests(unittest.TestCase):
1755 def test_private_init(self):
1756 bio = ssl.MemoryBIO()
1757 with self.assertRaisesRegex(TypeError, "public constructor"):
1758 ssl.SSLObject(bio, bio)
1759
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001760 def test_unwrap(self):
1761 client_ctx, server_ctx, hostname = testing_context()
1762 c_in = ssl.MemoryBIO()
1763 c_out = ssl.MemoryBIO()
1764 s_in = ssl.MemoryBIO()
1765 s_out = ssl.MemoryBIO()
1766 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1767 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1768
1769 # Loop on the handshake for a bit to get it settled
1770 for _ in range(5):
1771 try:
1772 client.do_handshake()
1773 except ssl.SSLWantReadError:
1774 pass
1775 if c_out.pending:
1776 s_in.write(c_out.read())
1777 try:
1778 server.do_handshake()
1779 except ssl.SSLWantReadError:
1780 pass
1781 if s_out.pending:
1782 c_in.write(s_out.read())
1783 # Now the handshakes should be complete (don't raise WantReadError)
1784 client.do_handshake()
1785 server.do_handshake()
1786
1787 # Now if we unwrap one side unilaterally, it should send close-notify
1788 # and raise WantReadError:
1789 with self.assertRaises(ssl.SSLWantReadError):
1790 client.unwrap()
1791
1792 # But server.unwrap() does not raise, because it reads the client's
1793 # close-notify:
1794 s_in.write(c_out.read())
1795 server.unwrap()
1796
1797 # And now that the client gets the server's close-notify, it doesn't
1798 # raise either.
1799 c_in.write(s_out.read())
1800 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001801
Martin Panter3840b2a2016-03-27 01:53:46 +00001802class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001803 """Tests that connect to a simple server running in the background"""
1804
1805 def setUp(self):
1806 server = ThreadedEchoServer(SIGNED_CERTFILE)
1807 self.server_addr = (HOST, server.port)
1808 server.__enter__()
1809 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001810
Antoine Pitrou480a1242010-04-28 21:37:09 +00001811 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001812 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001813 cert_reqs=ssl.CERT_NONE) as s:
1814 s.connect(self.server_addr)
1815 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001816 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001817
Martin Panter3840b2a2016-03-27 01:53:46 +00001818 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001819 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001820 cert_reqs=ssl.CERT_REQUIRED,
1821 ca_certs=SIGNING_CA) as s:
1822 s.connect(self.server_addr)
1823 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001824 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001825
Martin Panter3840b2a2016-03-27 01:53:46 +00001826 def test_connect_fail(self):
1827 # This should fail because we have no verification certs. Connection
1828 # failure crashes ThreadedEchoServer, so run this in an independent
1829 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001830 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001831 cert_reqs=ssl.CERT_REQUIRED)
1832 self.addCleanup(s.close)
1833 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1834 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001835
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001836 def test_connect_ex(self):
1837 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001838 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001839 cert_reqs=ssl.CERT_REQUIRED,
1840 ca_certs=SIGNING_CA)
1841 self.addCleanup(s.close)
1842 self.assertEqual(0, s.connect_ex(self.server_addr))
1843 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001844
1845 def test_non_blocking_connect_ex(self):
1846 # Issue #11326: non-blocking connect_ex() should allow handshake
1847 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001848 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001849 cert_reqs=ssl.CERT_REQUIRED,
1850 ca_certs=SIGNING_CA,
1851 do_handshake_on_connect=False)
1852 self.addCleanup(s.close)
1853 s.setblocking(False)
1854 rc = s.connect_ex(self.server_addr)
1855 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1856 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1857 # Wait for connect to finish
1858 select.select([], [s], [], 5.0)
1859 # Non-blocking handshake
1860 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001861 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001862 s.do_handshake()
1863 break
1864 except ssl.SSLWantReadError:
1865 select.select([s], [], [], 5.0)
1866 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001867 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001868 # SSL established
1869 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001870
Antoine Pitrou152efa22010-05-16 18:19:27 +00001871 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001873 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001874 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1875 s.connect(self.server_addr)
1876 self.assertEqual({}, s.getpeercert())
1877 # Same with a server hostname
1878 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1879 server_hostname="dummy") as s:
1880 s.connect(self.server_addr)
1881 ctx.verify_mode = ssl.CERT_REQUIRED
1882 # This should succeed because we specify the root cert
1883 ctx.load_verify_locations(SIGNING_CA)
1884 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1885 s.connect(self.server_addr)
1886 cert = s.getpeercert()
1887 self.assertTrue(cert)
1888
1889 def test_connect_with_context_fail(self):
1890 # This should fail because we have no verification certs. Connection
1891 # failure crashes ThreadedEchoServer, so run this in an independent
1892 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001893 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001894 ctx.verify_mode = ssl.CERT_REQUIRED
1895 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1896 self.addCleanup(s.close)
1897 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1898 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001899
1900 def test_connect_capath(self):
1901 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001902 # NOTE: the subject hashing algorithm has been changed between
1903 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1904 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001905 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001906 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001907 ctx.verify_mode = ssl.CERT_REQUIRED
1908 ctx.load_verify_locations(capath=CAPATH)
1909 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1910 s.connect(self.server_addr)
1911 cert = s.getpeercert()
1912 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001913
Martin Panter3840b2a2016-03-27 01:53:46 +00001914 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001915 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001916 ctx.verify_mode = ssl.CERT_REQUIRED
1917 ctx.load_verify_locations(capath=BYTES_CAPATH)
1918 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1919 s.connect(self.server_addr)
1920 cert = s.getpeercert()
1921 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001922
Christian Heimesefff7062013-11-21 03:35:02 +01001923 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001925 pem = f.read()
1926 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001927 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001928 ctx.verify_mode = ssl.CERT_REQUIRED
1929 ctx.load_verify_locations(cadata=pem)
1930 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1931 s.connect(self.server_addr)
1932 cert = s.getpeercert()
1933 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001934
Martin Panter3840b2a2016-03-27 01:53:46 +00001935 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001936 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001937 ctx.verify_mode = ssl.CERT_REQUIRED
1938 ctx.load_verify_locations(cadata=der)
1939 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1940 s.connect(self.server_addr)
1941 cert = s.getpeercert()
1942 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001943
Antoine Pitroue3220242010-04-24 11:13:53 +00001944 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1945 def test_makefile_close(self):
1946 # Issue #5238: creating a file-like object with makefile() shouldn't
1947 # delay closing the underlying "real socket" (here tested with its
1948 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001949 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001950 ss.connect(self.server_addr)
1951 fd = ss.fileno()
1952 f = ss.makefile()
1953 f.close()
1954 # The fd is still open
1955 os.read(fd, 0)
1956 # Closing the SSL socket should close the fd too
1957 ss.close()
1958 gc.collect()
1959 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001960 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001961 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001962
Antoine Pitrou480a1242010-04-28 21:37:09 +00001963 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001964 s = socket.socket(socket.AF_INET)
1965 s.connect(self.server_addr)
1966 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001967 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001968 cert_reqs=ssl.CERT_NONE,
1969 do_handshake_on_connect=False)
1970 self.addCleanup(s.close)
1971 count = 0
1972 while True:
1973 try:
1974 count += 1
1975 s.do_handshake()
1976 break
1977 except ssl.SSLWantReadError:
1978 select.select([s], [], [])
1979 except ssl.SSLWantWriteError:
1980 select.select([], [s], [])
1981 if support.verbose:
1982 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001983
Antoine Pitrou480a1242010-04-28 21:37:09 +00001984 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001986
Martin Panter3840b2a2016-03-27 01:53:46 +00001987 def test_get_server_certificate_fail(self):
1988 # Connection failure crashes ThreadedEchoServer, so run this in an
1989 # independent test method
1990 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001991
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001992 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001993 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001994 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1995 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001996 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001997 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1998 s.connect(self.server_addr)
1999 # Error checking can happen at instantiation or when connecting
2000 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2001 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002002 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002003 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2004 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002005
Christian Heimes9a5395a2013-06-17 15:44:12 +02002006 def test_get_ca_certs_capath(self):
2007 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002008 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002009 ctx.load_verify_locations(capath=CAPATH)
2010 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002011 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2012 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002013 s.connect(self.server_addr)
2014 cert = s.getpeercert()
2015 self.assertTrue(cert)
2016 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002017
Christian Heimes575596e2013-12-15 21:49:17 +01002018 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002019 def test_context_setget(self):
2020 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002021 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2022 ctx1.load_verify_locations(capath=CAPATH)
2023 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2024 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002025 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002026 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002027 ss.connect(self.server_addr)
2028 self.assertIs(ss.context, ctx1)
2029 self.assertIs(ss._sslobj.context, ctx1)
2030 ss.context = ctx2
2031 self.assertIs(ss.context, ctx2)
2032 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002033
2034 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2035 # A simple IO loop. Call func(*args) depending on the error we get
2036 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2037 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002038 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002039 count = 0
2040 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002041 if time.monotonic() > deadline:
2042 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002043 errno = None
2044 count += 1
2045 try:
2046 ret = func(*args)
2047 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002048 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002049 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002050 raise
2051 errno = e.errno
2052 # Get any data from the outgoing BIO irrespective of any error, and
2053 # send it to the socket.
2054 buf = outgoing.read()
2055 sock.sendall(buf)
2056 # If there's no error, we're done. For WANT_READ, we need to get
2057 # data from the socket and put it in the incoming BIO.
2058 if errno is None:
2059 break
2060 elif errno == ssl.SSL_ERROR_WANT_READ:
2061 buf = sock.recv(32768)
2062 if buf:
2063 incoming.write(buf)
2064 else:
2065 incoming.write_eof()
2066 if support.verbose:
2067 sys.stdout.write("Needed %d calls to complete %s().\n"
2068 % (count, func.__name__))
2069 return ret
2070
Martin Panter3840b2a2016-03-27 01:53:46 +00002071 def test_bio_handshake(self):
2072 sock = socket.socket(socket.AF_INET)
2073 self.addCleanup(sock.close)
2074 sock.connect(self.server_addr)
2075 incoming = ssl.MemoryBIO()
2076 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002077 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2078 self.assertTrue(ctx.check_hostname)
2079 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002080 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002081 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2082 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002083 self.assertIs(sslobj._sslobj.owner, sslobj)
2084 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002085 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002086 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002087 self.assertRaises(ValueError, sslobj.getpeercert)
2088 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2089 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2090 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2091 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002092 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002093 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002094 self.assertTrue(sslobj.getpeercert())
2095 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2096 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2097 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002098 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002099 except ssl.SSLSyscallError:
2100 # If the server shuts down the TCP connection without sending a
2101 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2102 pass
2103 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2104
2105 def test_bio_read_write_data(self):
2106 sock = socket.socket(socket.AF_INET)
2107 self.addCleanup(sock.close)
2108 sock.connect(self.server_addr)
2109 incoming = ssl.MemoryBIO()
2110 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002111 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002112 ctx.verify_mode = ssl.CERT_NONE
2113 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2114 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2115 req = b'FOO\n'
2116 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2117 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2118 self.assertEqual(buf, b'foo\n')
2119 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002120
2121
Martin Panter3840b2a2016-03-27 01:53:46 +00002122class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002123
Martin Panter3840b2a2016-03-27 01:53:46 +00002124 def test_timeout_connect_ex(self):
2125 # Issue #12065: on a timeout, connect_ex() should return the original
2126 # errno (mimicking the behaviour of non-SSL sockets).
2127 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002128 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002129 cert_reqs=ssl.CERT_REQUIRED,
2130 do_handshake_on_connect=False)
2131 self.addCleanup(s.close)
2132 s.settimeout(0.0000001)
2133 rc = s.connect_ex((REMOTE_HOST, 443))
2134 if rc == 0:
2135 self.skipTest("REMOTE_HOST responded too quickly")
2136 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2137
2138 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2139 def test_get_server_certificate_ipv6(self):
2140 with support.transient_internet('ipv6.google.com'):
2141 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2142 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2143
Martin Panter3840b2a2016-03-27 01:53:46 +00002144
2145def _test_get_server_certificate(test, host, port, cert=None):
2146 pem = ssl.get_server_certificate((host, port))
2147 if not pem:
2148 test.fail("No server certificate on %s:%s!" % (host, port))
2149
2150 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2151 if not pem:
2152 test.fail("No server certificate on %s:%s!" % (host, port))
2153 if support.verbose:
2154 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2155
2156def _test_get_server_certificate_fail(test, host, port):
2157 try:
2158 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2159 except ssl.SSLError as x:
2160 #should fail
2161 if support.verbose:
2162 sys.stdout.write("%s\n" % x)
2163 else:
2164 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2165
2166
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002167from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002169class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002170
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002171 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002173 """A mildly complicated class, because we want it to work both
2174 with and without the SSL wrapper around the socket connection, so
2175 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002176
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002177 def __init__(self, server, connsock, addr):
2178 self.server = server
2179 self.running = False
2180 self.sock = connsock
2181 self.addr = addr
2182 self.sock.setblocking(1)
2183 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002184 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002185 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002186
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002187 def wrap_conn(self):
2188 try:
2189 self.sslconn = self.server.context.wrap_socket(
2190 self.sock, server_side=True)
2191 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2192 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002193 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002194 # We treat ConnectionResetError as though it were an
2195 # SSLError - OpenSSL on Ubuntu abruptly closes the
2196 # connection when asked to use an unsupported protocol.
2197 #
Christian Heimes529525f2018-05-23 22:24:45 +02002198 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2199 # tries to send session tickets after handshake.
2200 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002201 #
2202 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2203 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002204 self.server.conn_errors.append(str(e))
2205 if self.server.chatty:
2206 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2207 self.running = False
2208 self.close()
2209 return False
2210 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002211 # OSError may occur with wrong protocols, e.g. both
2212 # sides use PROTOCOL_TLS_SERVER.
2213 #
2214 # XXX Various errors can have happened here, for example
2215 # a mismatching protocol version, an invalid certificate,
2216 # or a low-level bug. This should be made more discriminating.
2217 #
2218 # bpo-31323: Store the exception as string to prevent
2219 # a reference leak: server -> conn_errors -> exception
2220 # -> traceback -> self (ConnectionHandler) -> server
2221 self.server.conn_errors.append(str(e))
2222 if self.server.chatty:
2223 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2224 self.running = False
2225 self.server.stop()
2226 self.close()
2227 return False
2228 else:
2229 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2230 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2231 cert = self.sslconn.getpeercert()
2232 if support.verbose and self.server.chatty:
2233 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2234 cert_binary = self.sslconn.getpeercert(True)
2235 if support.verbose and self.server.chatty:
2236 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2237 cipher = self.sslconn.cipher()
2238 if support.verbose and self.server.chatty:
2239 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2240 sys.stdout.write(" server: selected protocol is now "
2241 + str(self.sslconn.selected_npn_protocol()) + "\n")
2242 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002243
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002244 def read(self):
2245 if self.sslconn:
2246 return self.sslconn.read()
2247 else:
2248 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002249
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002250 def write(self, bytes):
2251 if self.sslconn:
2252 return self.sslconn.write(bytes)
2253 else:
2254 return self.sock.send(bytes)
2255
2256 def close(self):
2257 if self.sslconn:
2258 self.sslconn.close()
2259 else:
2260 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002261
Antoine Pitrou480a1242010-04-28 21:37:09 +00002262 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002263 self.running = True
2264 if not self.server.starttls_server:
2265 if not self.wrap_conn():
2266 return
2267 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002268 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002269 msg = self.read()
2270 stripped = msg.strip()
2271 if not stripped:
2272 # eof, so quit this handler
2273 self.running = False
2274 try:
2275 self.sock = self.sslconn.unwrap()
2276 except OSError:
2277 # Many tests shut the TCP connection down
2278 # without an SSL shutdown. This causes
2279 # unwrap() to raise OSError with errno=0!
2280 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002281 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002282 self.sslconn = None
2283 self.close()
2284 elif stripped == b'over':
2285 if support.verbose and self.server.connectionchatty:
2286 sys.stdout.write(" server: client closed connection\n")
2287 self.close()
2288 return
2289 elif (self.server.starttls_server and
2290 stripped == b'STARTTLS'):
2291 if support.verbose and self.server.connectionchatty:
2292 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2293 self.write(b"OK\n")
2294 if not self.wrap_conn():
2295 return
2296 elif (self.server.starttls_server and self.sslconn
2297 and stripped == b'ENDTLS'):
2298 if support.verbose and self.server.connectionchatty:
2299 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2300 self.write(b"OK\n")
2301 self.sock = self.sslconn.unwrap()
2302 self.sslconn = None
2303 if support.verbose and self.server.connectionchatty:
2304 sys.stdout.write(" server: connection is now unencrypted...\n")
2305 elif stripped == b'CB tls-unique':
2306 if support.verbose and self.server.connectionchatty:
2307 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2308 data = self.sslconn.get_channel_binding("tls-unique")
2309 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002310 elif stripped == b'PHA':
2311 if support.verbose and self.server.connectionchatty:
2312 sys.stdout.write(" server: initiating post handshake auth\n")
2313 try:
2314 self.sslconn.verify_client_post_handshake()
2315 except ssl.SSLError as e:
2316 self.write(repr(e).encode("us-ascii") + b"\n")
2317 else:
2318 self.write(b"OK\n")
2319 elif stripped == b'HASCERT':
2320 if self.sslconn.getpeercert() is not None:
2321 self.write(b'TRUE\n')
2322 else:
2323 self.write(b'FALSE\n')
2324 elif stripped == b'GETCERT':
2325 cert = self.sslconn.getpeercert()
2326 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002327 else:
2328 if (support.verbose and
2329 self.server.connectionchatty):
2330 ctype = (self.sslconn and "encrypted") or "unencrypted"
2331 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2332 % (msg, ctype, msg.lower(), ctype))
2333 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002334 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002335 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2336 # when connection is not shut down gracefully.
2337 if self.server.chatty and support.verbose:
2338 sys.stdout.write(
2339 " Connection reset by peer: {}\n".format(
2340 self.addr)
2341 )
2342 self.close()
2343 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002344 except ssl.SSLError as err:
2345 # On Windows sometimes test_pha_required_nocert receives the
2346 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2347 # before the 'tlsv13 alert certificate required' exception.
2348 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2349 # is received test_pha_required_nocert fails with ConnectionResetError
2350 # because the underlying socket is closed
2351 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2352 if self.server.chatty and support.verbose:
2353 sys.stdout.write(err.args[1])
2354 # test_pha_required_nocert is expecting this exception
2355 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002356 except OSError:
2357 if self.server.chatty:
2358 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002359 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002360 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002361
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002362 # normally, we'd just stop here, but for the test
2363 # harness, we want to stop the server
2364 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002365
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002366 def __init__(self, certificate=None, ssl_version=None,
2367 certreqs=None, cacerts=None,
2368 chatty=True, connectionchatty=False, starttls_server=False,
2369 npn_protocols=None, alpn_protocols=None,
2370 ciphers=None, context=None):
2371 if context:
2372 self.context = context
2373 else:
2374 self.context = ssl.SSLContext(ssl_version
2375 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002376 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002377 self.context.verify_mode = (certreqs if certreqs is not None
2378 else ssl.CERT_NONE)
2379 if cacerts:
2380 self.context.load_verify_locations(cacerts)
2381 if certificate:
2382 self.context.load_cert_chain(certificate)
2383 if npn_protocols:
2384 self.context.set_npn_protocols(npn_protocols)
2385 if alpn_protocols:
2386 self.context.set_alpn_protocols(alpn_protocols)
2387 if ciphers:
2388 self.context.set_ciphers(ciphers)
2389 self.chatty = chatty
2390 self.connectionchatty = connectionchatty
2391 self.starttls_server = starttls_server
2392 self.sock = socket.socket()
2393 self.port = support.bind_port(self.sock)
2394 self.flag = None
2395 self.active = False
2396 self.selected_npn_protocols = []
2397 self.selected_alpn_protocols = []
2398 self.shared_ciphers = []
2399 self.conn_errors = []
2400 threading.Thread.__init__(self)
2401 self.daemon = True
2402
2403 def __enter__(self):
2404 self.start(threading.Event())
2405 self.flag.wait()
2406 return self
2407
2408 def __exit__(self, *args):
2409 self.stop()
2410 self.join()
2411
2412 def start(self, flag=None):
2413 self.flag = flag
2414 threading.Thread.start(self)
2415
2416 def run(self):
2417 self.sock.settimeout(0.05)
2418 self.sock.listen()
2419 self.active = True
2420 if self.flag:
2421 # signal an event
2422 self.flag.set()
2423 while self.active:
2424 try:
2425 newconn, connaddr = self.sock.accept()
2426 if support.verbose and self.chatty:
2427 sys.stdout.write(' server: new connection from '
2428 + repr(connaddr) + '\n')
2429 handler = self.ConnectionHandler(self, newconn, connaddr)
2430 handler.start()
2431 handler.join()
2432 except socket.timeout:
2433 pass
2434 except KeyboardInterrupt:
2435 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002436 except BaseException as e:
2437 if support.verbose and self.chatty:
2438 sys.stdout.write(
2439 ' connection handling failed: ' + repr(e) + '\n')
2440
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 self.sock.close()
2442
2443 def stop(self):
2444 self.active = False
2445
2446class AsyncoreEchoServer(threading.Thread):
2447
2448 # this one's based on asyncore.dispatcher
2449
2450 class EchoServer (asyncore.dispatcher):
2451
2452 class ConnectionHandler(asyncore.dispatcher_with_send):
2453
2454 def __init__(self, conn, certfile):
2455 self.socket = test_wrap_socket(conn, server_side=True,
2456 certfile=certfile,
2457 do_handshake_on_connect=False)
2458 asyncore.dispatcher_with_send.__init__(self, self.socket)
2459 self._ssl_accepting = True
2460 self._do_ssl_handshake()
2461
2462 def readable(self):
2463 if isinstance(self.socket, ssl.SSLSocket):
2464 while self.socket.pending() > 0:
2465 self.handle_read_event()
2466 return True
2467
2468 def _do_ssl_handshake(self):
2469 try:
2470 self.socket.do_handshake()
2471 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2472 return
2473 except ssl.SSLEOFError:
2474 return self.handle_close()
2475 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002476 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002477 except OSError as err:
2478 if err.args[0] == errno.ECONNABORTED:
2479 return self.handle_close()
2480 else:
2481 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002482
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002483 def handle_read(self):
2484 if self._ssl_accepting:
2485 self._do_ssl_handshake()
2486 else:
2487 data = self.recv(1024)
2488 if support.verbose:
2489 sys.stdout.write(" server: read %s from client\n" % repr(data))
2490 if not data:
2491 self.close()
2492 else:
2493 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002494
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002495 def handle_close(self):
2496 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002497 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002498 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002499
2500 def handle_error(self):
2501 raise
2502
Trent Nelson78520002008-04-10 20:54:35 +00002503 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002504 self.certfile = certfile
2505 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2506 self.port = support.bind_port(sock, '')
2507 asyncore.dispatcher.__init__(self, sock)
2508 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002509
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002510 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002511 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002512 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2513 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002514
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002515 def handle_error(self):
2516 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002517
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002518 def __init__(self, certfile):
2519 self.flag = None
2520 self.active = False
2521 self.server = self.EchoServer(certfile)
2522 self.port = self.server.port
2523 threading.Thread.__init__(self)
2524 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002525
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002526 def __str__(self):
2527 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002528
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 def __enter__(self):
2530 self.start(threading.Event())
2531 self.flag.wait()
2532 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002533
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002534 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002535 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 sys.stdout.write(" cleanup: stopping server.\n")
2537 self.stop()
2538 if support.verbose:
2539 sys.stdout.write(" cleanup: joining server thread.\n")
2540 self.join()
2541 if support.verbose:
2542 sys.stdout.write(" cleanup: successfully joined.\n")
2543 # make sure that ConnectionHandler is removed from socket_map
2544 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002545
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002546 def start (self, flag=None):
2547 self.flag = flag
2548 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002549
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002550 def run(self):
2551 self.active = True
2552 if self.flag:
2553 self.flag.set()
2554 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002555 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002556 asyncore.loop(1)
2557 except:
2558 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002559
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002560 def stop(self):
2561 self.active = False
2562 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002563
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002564def server_params_test(client_context, server_context, indata=b"FOO\n",
2565 chatty=True, connectionchatty=False, sni_name=None,
2566 session=None):
2567 """
2568 Launch a server, connect a client to it and try various reads
2569 and writes.
2570 """
2571 stats = {}
2572 server = ThreadedEchoServer(context=server_context,
2573 chatty=chatty,
2574 connectionchatty=False)
2575 with server:
2576 with client_context.wrap_socket(socket.socket(),
2577 server_hostname=sni_name, session=session) as s:
2578 s.connect((HOST, server.port))
2579 for arg in [indata, bytearray(indata), memoryview(indata)]:
2580 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002581 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002582 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002583 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002584 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002585 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 if connectionchatty:
2587 if support.verbose:
2588 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002589 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002591 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2592 % (outdata[:20], len(outdata),
2593 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594 s.write(b"over\n")
2595 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002596 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002597 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002598 stats.update({
2599 'compression': s.compression(),
2600 'cipher': s.cipher(),
2601 'peercert': s.getpeercert(),
2602 'client_alpn_protocol': s.selected_alpn_protocol(),
2603 'client_npn_protocol': s.selected_npn_protocol(),
2604 'version': s.version(),
2605 'session_reused': s.session_reused,
2606 'session': s.session,
2607 })
2608 s.close()
2609 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2610 stats['server_npn_protocols'] = server.selected_npn_protocols
2611 stats['server_shared_ciphers'] = server.shared_ciphers
2612 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002613
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002614def try_protocol_combo(server_protocol, client_protocol, expect_success,
2615 certsreqs=None, server_options=0, client_options=0):
2616 """
2617 Try to SSL-connect using *client_protocol* to *server_protocol*.
2618 If *expect_success* is true, assert that the connection succeeds,
2619 if it's false, assert that the connection fails.
2620 Also, if *expect_success* is a string, assert that it is the protocol
2621 version actually used by the connection.
2622 """
2623 if certsreqs is None:
2624 certsreqs = ssl.CERT_NONE
2625 certtype = {
2626 ssl.CERT_NONE: "CERT_NONE",
2627 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2628 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2629 }[certsreqs]
2630 if support.verbose:
2631 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2632 sys.stdout.write(formatstr %
2633 (ssl.get_protocol_name(client_protocol),
2634 ssl.get_protocol_name(server_protocol),
2635 certtype))
2636 client_context = ssl.SSLContext(client_protocol)
2637 client_context.options |= client_options
2638 server_context = ssl.SSLContext(server_protocol)
2639 server_context.options |= server_options
2640
Victor Stinner3ef63442019-02-19 18:06:03 +01002641 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2642 if (min_version is not None
2643 # SSLContext.minimum_version is only available on recent OpenSSL
2644 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2645 and hasattr(server_context, 'minimum_version')
2646 and server_protocol == ssl.PROTOCOL_TLS
2647 and server_context.minimum_version > min_version):
2648 # If OpenSSL configuration is strict and requires more recent TLS
2649 # version, we have to change the minimum to test old TLS versions.
2650 server_context.minimum_version = min_version
2651
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002652 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2653 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2654 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002655 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002656 client_context.set_ciphers("ALL")
2657
2658 for ctx in (client_context, server_context):
2659 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002660 ctx.load_cert_chain(SIGNED_CERTFILE)
2661 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002662 try:
2663 stats = server_params_test(client_context, server_context,
2664 chatty=False, connectionchatty=False)
2665 # Protocol mismatch can result in either an SSLError, or a
2666 # "Connection reset by peer" error.
2667 except ssl.SSLError:
2668 if expect_success:
2669 raise
2670 except OSError as e:
2671 if expect_success or e.errno != errno.ECONNRESET:
2672 raise
2673 else:
2674 if not expect_success:
2675 raise AssertionError(
2676 "Client protocol %s succeeded with server protocol %s!"
2677 % (ssl.get_protocol_name(client_protocol),
2678 ssl.get_protocol_name(server_protocol)))
2679 elif (expect_success is not True
2680 and expect_success != stats['version']):
2681 raise AssertionError("version mismatch: expected %r, got %r"
2682 % (expect_success, stats['version']))
2683
2684
2685class ThreadedTests(unittest.TestCase):
2686
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687 def test_echo(self):
2688 """Basic test of an SSL client connecting to a server"""
2689 if support.verbose:
2690 sys.stdout.write("\n")
2691 for protocol in PROTOCOLS:
2692 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2693 continue
2694 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2695 context = ssl.SSLContext(protocol)
2696 context.load_cert_chain(CERTFILE)
2697 server_params_test(context, context,
2698 chatty=True, connectionchatty=True)
2699
Christian Heimesa170fa12017-09-15 20:27:30 +02002700 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002701
2702 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2703 server_params_test(client_context=client_context,
2704 server_context=server_context,
2705 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002706 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002707
2708 client_context.check_hostname = False
2709 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2710 with self.assertRaises(ssl.SSLError) as e:
2711 server_params_test(client_context=server_context,
2712 server_context=client_context,
2713 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002714 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002715 self.assertIn('called a function you should not call',
2716 str(e.exception))
2717
2718 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2719 with self.assertRaises(ssl.SSLError) as e:
2720 server_params_test(client_context=server_context,
2721 server_context=server_context,
2722 chatty=True, connectionchatty=True)
2723 self.assertIn('called a function you should not call',
2724 str(e.exception))
2725
2726 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2727 with self.assertRaises(ssl.SSLError) as e:
2728 server_params_test(client_context=server_context,
2729 server_context=client_context,
2730 chatty=True, connectionchatty=True)
2731 self.assertIn('called a function you should not call',
2732 str(e.exception))
2733
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002734 def test_getpeercert(self):
2735 if support.verbose:
2736 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002737
2738 client_context, server_context, hostname = testing_context()
2739 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002740 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002741 with client_context.wrap_socket(socket.socket(),
2742 do_handshake_on_connect=False,
2743 server_hostname=hostname) as s:
2744 s.connect((HOST, server.port))
2745 # getpeercert() raise ValueError while the handshake isn't
2746 # done.
2747 with self.assertRaises(ValueError):
2748 s.getpeercert()
2749 s.do_handshake()
2750 cert = s.getpeercert()
2751 self.assertTrue(cert, "Can't get peer certificate.")
2752 cipher = s.cipher()
2753 if support.verbose:
2754 sys.stdout.write(pprint.pformat(cert) + '\n')
2755 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2756 if 'subject' not in cert:
2757 self.fail("No subject field in certificate: %s." %
2758 pprint.pformat(cert))
2759 if ((('organizationName', 'Python Software Foundation'),)
2760 not in cert['subject']):
2761 self.fail(
2762 "Missing or invalid 'organizationName' field in certificate subject; "
2763 "should be 'Python Software Foundation'.")
2764 self.assertIn('notBefore', cert)
2765 self.assertIn('notAfter', cert)
2766 before = ssl.cert_time_to_seconds(cert['notBefore'])
2767 after = ssl.cert_time_to_seconds(cert['notAfter'])
2768 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002769
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002770 @unittest.skipUnless(have_verify_flags(),
2771 "verify_flags need OpenSSL > 0.9.8")
2772 def test_crl_check(self):
2773 if support.verbose:
2774 sys.stdout.write("\n")
2775
Christian Heimesa170fa12017-09-15 20:27:30 +02002776 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002777
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002779 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780
2781 # VERIFY_DEFAULT should pass
2782 server = ThreadedEchoServer(context=server_context, chatty=True)
2783 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002784 with client_context.wrap_socket(socket.socket(),
2785 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002786 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002787 cert = s.getpeercert()
2788 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002789
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002790 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002791 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002792
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002793 server = ThreadedEchoServer(context=server_context, chatty=True)
2794 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002795 with client_context.wrap_socket(socket.socket(),
2796 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002797 with self.assertRaisesRegex(ssl.SSLError,
2798 "certificate verify failed"):
2799 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002800
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002801 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002802 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002803
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002804 server = ThreadedEchoServer(context=server_context, chatty=True)
2805 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002806 with client_context.wrap_socket(socket.socket(),
2807 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002808 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002809 cert = s.getpeercert()
2810 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002811
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002812 def test_check_hostname(self):
2813 if support.verbose:
2814 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002815
Christian Heimesa170fa12017-09-15 20:27:30 +02002816 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002817
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002818 # correct hostname should verify
2819 server = ThreadedEchoServer(context=server_context, chatty=True)
2820 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002821 with client_context.wrap_socket(socket.socket(),
2822 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002823 s.connect((HOST, server.port))
2824 cert = s.getpeercert()
2825 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002826
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002827 # incorrect hostname should raise an exception
2828 server = ThreadedEchoServer(context=server_context, chatty=True)
2829 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002830 with client_context.wrap_socket(socket.socket(),
2831 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002832 with self.assertRaisesRegex(
2833 ssl.CertificateError,
2834 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002836
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002837 # missing server_hostname arg should cause an exception, too
2838 server = ThreadedEchoServer(context=server_context, chatty=True)
2839 with server:
2840 with socket.socket() as s:
2841 with self.assertRaisesRegex(ValueError,
2842 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002843 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002844
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002845 def test_ecc_cert(self):
2846 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2847 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002848 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002849 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2850
2851 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2852 # load ECC cert
2853 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2854
2855 # correct hostname should verify
2856 server = ThreadedEchoServer(context=server_context, chatty=True)
2857 with server:
2858 with client_context.wrap_socket(socket.socket(),
2859 server_hostname=hostname) as s:
2860 s.connect((HOST, server.port))
2861 cert = s.getpeercert()
2862 self.assertTrue(cert, "Can't get peer certificate.")
2863 cipher = s.cipher()[0].split('-')
2864 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2865
2866 def test_dual_rsa_ecc(self):
2867 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2868 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002869 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2870 # algorithms.
2871 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002872 # only ECDSA certs
2873 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2874 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2875
2876 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2877 # load ECC and RSA key/cert pairs
2878 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2879 server_context.load_cert_chain(SIGNED_CERTFILE)
2880
2881 # correct hostname should verify
2882 server = ThreadedEchoServer(context=server_context, chatty=True)
2883 with server:
2884 with client_context.wrap_socket(socket.socket(),
2885 server_hostname=hostname) as s:
2886 s.connect((HOST, server.port))
2887 cert = s.getpeercert()
2888 self.assertTrue(cert, "Can't get peer certificate.")
2889 cipher = s.cipher()[0].split('-')
2890 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2891
Christian Heimes66e57422018-01-29 14:25:13 +01002892 def test_check_hostname_idn(self):
2893 if support.verbose:
2894 sys.stdout.write("\n")
2895
Christian Heimes11a14932018-02-24 02:35:08 +01002896 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002897 server_context.load_cert_chain(IDNSANSFILE)
2898
Christian Heimes11a14932018-02-24 02:35:08 +01002899 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002900 context.verify_mode = ssl.CERT_REQUIRED
2901 context.check_hostname = True
2902 context.load_verify_locations(SIGNING_CA)
2903
2904 # correct hostname should verify, when specified in several
2905 # different ways
2906 idn_hostnames = [
2907 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002908 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002909 ('xn--knig-5qa.idn.pythontest.net',
2910 'xn--knig-5qa.idn.pythontest.net'),
2911 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002912 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002913
2914 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002915 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002916 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2917 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2918 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002919 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2920
2921 # ('königsgäßchen.idna2008.pythontest.net',
2922 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2923 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2924 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2925 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2926 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2927
Christian Heimes66e57422018-01-29 14:25:13 +01002928 ]
2929 for server_hostname, expected_hostname in idn_hostnames:
2930 server = ThreadedEchoServer(context=server_context, chatty=True)
2931 with server:
2932 with context.wrap_socket(socket.socket(),
2933 server_hostname=server_hostname) as s:
2934 self.assertEqual(s.server_hostname, expected_hostname)
2935 s.connect((HOST, server.port))
2936 cert = s.getpeercert()
2937 self.assertEqual(s.server_hostname, expected_hostname)
2938 self.assertTrue(cert, "Can't get peer certificate.")
2939
Christian Heimes66e57422018-01-29 14:25:13 +01002940 # incorrect hostname should raise an exception
2941 server = ThreadedEchoServer(context=server_context, chatty=True)
2942 with server:
2943 with context.wrap_socket(socket.socket(),
2944 server_hostname="python.example.org") as s:
2945 with self.assertRaises(ssl.CertificateError):
2946 s.connect((HOST, server.port))
2947
Christian Heimes529525f2018-05-23 22:24:45 +02002948 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002949 """Connecting when the server rejects the client's certificate
2950
2951 Launch a server with CERT_REQUIRED, and check that trying to
2952 connect to it with a wrong client certificate fails.
2953 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002954 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002955 # load client cert that is not signed by trusted CA
2956 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002957 # require TLS client authentication
2958 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002959 # TLS 1.3 has different handshake
2960 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002961
2962 server = ThreadedEchoServer(
2963 context=server_context, chatty=True, connectionchatty=True,
2964 )
2965
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002966 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002967 client_context.wrap_socket(socket.socket(),
2968 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002969 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002970 # Expect either an SSL error about the server rejecting
2971 # the connection, or a low-level connection reset (which
2972 # sometimes happens on Windows)
2973 s.connect((HOST, server.port))
2974 except ssl.SSLError as e:
2975 if support.verbose:
2976 sys.stdout.write("\nSSLError is %r\n" % e)
2977 except OSError as e:
2978 if e.errno != errno.ECONNRESET:
2979 raise
2980 if support.verbose:
2981 sys.stdout.write("\nsocket.error is %r\n" % e)
2982 else:
2983 self.fail("Use of invalid cert should have failed!")
2984
Christian Heimes529525f2018-05-23 22:24:45 +02002985 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2986 def test_wrong_cert_tls13(self):
2987 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002988 # load client cert that is not signed by trusted CA
2989 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002990 server_context.verify_mode = ssl.CERT_REQUIRED
2991 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2992 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2993
2994 server = ThreadedEchoServer(
2995 context=server_context, chatty=True, connectionchatty=True,
2996 )
2997 with server, \
2998 client_context.wrap_socket(socket.socket(),
2999 server_hostname=hostname) as s:
3000 # TLS 1.3 perform client cert exchange after handshake
3001 s.connect((HOST, server.port))
3002 try:
3003 s.write(b'data')
3004 s.read(4)
3005 except ssl.SSLError as e:
3006 if support.verbose:
3007 sys.stdout.write("\nSSLError is %r\n" % e)
3008 except OSError as e:
3009 if e.errno != errno.ECONNRESET:
3010 raise
3011 if support.verbose:
3012 sys.stdout.write("\nsocket.error is %r\n" % e)
3013 else:
3014 self.fail("Use of invalid cert should have failed!")
3015
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003016 def test_rude_shutdown(self):
3017 """A brutal shutdown of an SSL server should raise an OSError
3018 in the client when attempting handshake.
3019 """
3020 listener_ready = threading.Event()
3021 listener_gone = threading.Event()
3022
3023 s = socket.socket()
3024 port = support.bind_port(s, HOST)
3025
3026 # `listener` runs in a thread. It sits in an accept() until
3027 # the main thread connects. Then it rudely closes the socket,
3028 # and sets Event `listener_gone` to let the main thread know
3029 # the socket is gone.
3030 def listener():
3031 s.listen()
3032 listener_ready.set()
3033 newsock, addr = s.accept()
3034 newsock.close()
3035 s.close()
3036 listener_gone.set()
3037
3038 def connector():
3039 listener_ready.wait()
3040 with socket.socket() as c:
3041 c.connect((HOST, port))
3042 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003043 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003044 ssl_sock = test_wrap_socket(c)
3045 except OSError:
3046 pass
3047 else:
3048 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003049
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003050 t = threading.Thread(target=listener)
3051 t.start()
3052 try:
3053 connector()
3054 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003055 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003056
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003057 def test_ssl_cert_verify_error(self):
3058 if support.verbose:
3059 sys.stdout.write("\n")
3060
3061 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3062 server_context.load_cert_chain(SIGNED_CERTFILE)
3063
3064 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3065
3066 server = ThreadedEchoServer(context=server_context, chatty=True)
3067 with server:
3068 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003069 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003070 try:
3071 s.connect((HOST, server.port))
3072 except ssl.SSLError as e:
3073 msg = 'unable to get local issuer certificate'
3074 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3075 self.assertEqual(e.verify_code, 20)
3076 self.assertEqual(e.verify_message, msg)
3077 self.assertIn(msg, repr(e))
3078 self.assertIn('certificate verify failed', repr(e))
3079
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003080 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3081 "OpenSSL is compiled without SSLv2 support")
3082 def test_protocol_sslv2(self):
3083 """Connecting to an SSLv2 server with various client options"""
3084 if support.verbose:
3085 sys.stdout.write("\n")
3086 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3087 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3088 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003089 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003090 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3091 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3092 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3093 # SSLv23 client with specific SSL options
3094 if no_sslv2_implies_sslv3_hello():
3095 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003096 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003097 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003098 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003099 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003100 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003101 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003102
Christian Heimesa170fa12017-09-15 20:27:30 +02003103 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003104 """Connecting to an SSLv23 server with various client options"""
3105 if support.verbose:
3106 sys.stdout.write("\n")
3107 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003108 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003109 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003110 except OSError as x:
3111 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3112 if support.verbose:
3113 sys.stdout.write(
3114 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3115 % str(x))
3116 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003117 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3118 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3119 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003120
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003121 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003122 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3123 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3124 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003125
3126 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003127 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3128 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3129 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003130
3131 # Server with specific SSL options
3132 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003133 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003134 server_options=ssl.OP_NO_SSLv3)
3135 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003136 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003137 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003138 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003139 server_options=ssl.OP_NO_TLSv1)
3140
3141
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003142 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3143 "OpenSSL is compiled without SSLv3 support")
3144 def test_protocol_sslv3(self):
3145 """Connecting to an SSLv3 server with various client options"""
3146 if support.verbose:
3147 sys.stdout.write("\n")
3148 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3149 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3150 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3151 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3152 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003153 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003154 client_options=ssl.OP_NO_SSLv3)
3155 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3156 if no_sslv2_implies_sslv3_hello():
3157 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003158 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003159 False, client_options=ssl.OP_NO_SSLv2)
3160
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003161 def test_protocol_tlsv1(self):
3162 """Connecting to a TLSv1 server with various client options"""
3163 if support.verbose:
3164 sys.stdout.write("\n")
3165 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3166 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3167 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3168 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3169 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3170 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3171 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003172 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003173 client_options=ssl.OP_NO_TLSv1)
3174
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003175 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3176 "TLS version 1.1 not supported.")
3177 def test_protocol_tlsv1_1(self):
3178 """Connecting to a TLSv1.1 server with various client options.
3179 Testing against older TLS versions."""
3180 if support.verbose:
3181 sys.stdout.write("\n")
3182 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3183 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3184 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3185 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3186 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003187 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003188 client_options=ssl.OP_NO_TLSv1_1)
3189
Christian Heimesa170fa12017-09-15 20:27:30 +02003190 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003191 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3192 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3193
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003194 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3195 "TLS version 1.2 not supported.")
3196 def test_protocol_tlsv1_2(self):
3197 """Connecting to a TLSv1.2 server with various client options.
3198 Testing against older TLS versions."""
3199 if support.verbose:
3200 sys.stdout.write("\n")
3201 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3202 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3203 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3204 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3205 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3206 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3207 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003208 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 client_options=ssl.OP_NO_TLSv1_2)
3210
Christian Heimesa170fa12017-09-15 20:27:30 +02003211 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003212 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3213 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3214 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3215 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3216
3217 def test_starttls(self):
3218 """Switching from clear text to encrypted and back again."""
3219 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3220
3221 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003222 starttls_server=True,
3223 chatty=True,
3224 connectionchatty=True)
3225 wrapped = False
3226 with server:
3227 s = socket.socket()
3228 s.setblocking(1)
3229 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003230 if support.verbose:
3231 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003232 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003233 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003234 sys.stdout.write(
3235 " client: sending %r...\n" % indata)
3236 if wrapped:
3237 conn.write(indata)
3238 outdata = conn.read()
3239 else:
3240 s.send(indata)
3241 outdata = s.recv(1024)
3242 msg = outdata.strip().lower()
3243 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3244 # STARTTLS ok, switch to secure mode
3245 if support.verbose:
3246 sys.stdout.write(
3247 " client: read %r from server, starting TLS...\n"
3248 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003249 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003250 wrapped = True
3251 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3252 # ENDTLS ok, switch back to clear text
3253 if support.verbose:
3254 sys.stdout.write(
3255 " client: read %r from server, ending TLS...\n"
3256 % msg)
3257 s = conn.unwrap()
3258 wrapped = False
3259 else:
3260 if support.verbose:
3261 sys.stdout.write(
3262 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003263 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003264 sys.stdout.write(" client: closing connection.\n")
3265 if wrapped:
3266 conn.write(b"over\n")
3267 else:
3268 s.send(b"over\n")
3269 if wrapped:
3270 conn.close()
3271 else:
3272 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003273
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003274 def test_socketserver(self):
3275 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003276 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003277 # try to connect
3278 if support.verbose:
3279 sys.stdout.write('\n')
3280 with open(CERTFILE, 'rb') as f:
3281 d1 = f.read()
3282 d2 = ''
3283 # now fetch the same data from the HTTPS server
3284 url = 'https://localhost:%d/%s' % (
3285 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003286 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003287 f = urllib.request.urlopen(url, context=context)
3288 try:
3289 dlen = f.info().get("content-length")
3290 if dlen and (int(dlen) > 0):
3291 d2 = f.read(int(dlen))
3292 if support.verbose:
3293 sys.stdout.write(
3294 " client: read %d bytes from remote server '%s'\n"
3295 % (len(d2), server))
3296 finally:
3297 f.close()
3298 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003299
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003300 def test_asyncore_server(self):
3301 """Check the example asyncore integration."""
3302 if support.verbose:
3303 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003304
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003305 indata = b"FOO\n"
3306 server = AsyncoreEchoServer(CERTFILE)
3307 with server:
3308 s = test_wrap_socket(socket.socket())
3309 s.connect(('127.0.0.1', server.port))
3310 if support.verbose:
3311 sys.stdout.write(
3312 " client: sending %r...\n" % indata)
3313 s.write(indata)
3314 outdata = s.read()
3315 if support.verbose:
3316 sys.stdout.write(" client: read %r\n" % outdata)
3317 if outdata != indata.lower():
3318 self.fail(
3319 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3320 % (outdata[:20], len(outdata),
3321 indata[:20].lower(), len(indata)))
3322 s.write(b"over\n")
3323 if support.verbose:
3324 sys.stdout.write(" client: closing connection.\n")
3325 s.close()
3326 if support.verbose:
3327 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003328
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003329 def test_recv_send(self):
3330 """Test recv(), send() and friends."""
3331 if support.verbose:
3332 sys.stdout.write("\n")
3333
3334 server = ThreadedEchoServer(CERTFILE,
3335 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003336 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003337 cacerts=CERTFILE,
3338 chatty=True,
3339 connectionchatty=False)
3340 with server:
3341 s = test_wrap_socket(socket.socket(),
3342 server_side=False,
3343 certfile=CERTFILE,
3344 ca_certs=CERTFILE,
3345 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003346 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003347 s.connect((HOST, server.port))
3348 # helper methods for standardising recv* method signatures
3349 def _recv_into():
3350 b = bytearray(b"\0"*100)
3351 count = s.recv_into(b)
3352 return b[:count]
3353
3354 def _recvfrom_into():
3355 b = bytearray(b"\0"*100)
3356 count, addr = s.recvfrom_into(b)
3357 return b[:count]
3358
3359 # (name, method, expect success?, *args, return value func)
3360 send_methods = [
3361 ('send', s.send, True, [], len),
3362 ('sendto', s.sendto, False, ["some.address"], len),
3363 ('sendall', s.sendall, True, [], lambda x: None),
3364 ]
3365 # (name, method, whether to expect success, *args)
3366 recv_methods = [
3367 ('recv', s.recv, True, []),
3368 ('recvfrom', s.recvfrom, False, ["some.address"]),
3369 ('recv_into', _recv_into, True, []),
3370 ('recvfrom_into', _recvfrom_into, False, []),
3371 ]
3372 data_prefix = "PREFIX_"
3373
3374 for (meth_name, send_meth, expect_success, args,
3375 ret_val_meth) in send_methods:
3376 indata = (data_prefix + meth_name).encode('ascii')
3377 try:
3378 ret = send_meth(indata, *args)
3379 msg = "sending with {}".format(meth_name)
3380 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3381 outdata = s.read()
3382 if outdata != indata.lower():
3383 self.fail(
3384 "While sending with <<{name:s}>> bad data "
3385 "<<{outdata:r}>> ({nout:d}) received; "
3386 "expected <<{indata:r}>> ({nin:d})\n".format(
3387 name=meth_name, outdata=outdata[:20],
3388 nout=len(outdata),
3389 indata=indata[:20], nin=len(indata)
3390 )
3391 )
3392 except ValueError as e:
3393 if expect_success:
3394 self.fail(
3395 "Failed to send with method <<{name:s}>>; "
3396 "expected to succeed.\n".format(name=meth_name)
3397 )
3398 if not str(e).startswith(meth_name):
3399 self.fail(
3400 "Method <<{name:s}>> failed with unexpected "
3401 "exception message: {exp:s}\n".format(
3402 name=meth_name, exp=e
3403 )
3404 )
3405
3406 for meth_name, recv_meth, expect_success, args in recv_methods:
3407 indata = (data_prefix + meth_name).encode('ascii')
3408 try:
3409 s.send(indata)
3410 outdata = recv_meth(*args)
3411 if outdata != indata.lower():
3412 self.fail(
3413 "While receiving with <<{name:s}>> bad data "
3414 "<<{outdata:r}>> ({nout:d}) received; "
3415 "expected <<{indata:r}>> ({nin:d})\n".format(
3416 name=meth_name, outdata=outdata[:20],
3417 nout=len(outdata),
3418 indata=indata[:20], nin=len(indata)
3419 )
3420 )
3421 except ValueError as e:
3422 if expect_success:
3423 self.fail(
3424 "Failed to receive with method <<{name:s}>>; "
3425 "expected to succeed.\n".format(name=meth_name)
3426 )
3427 if not str(e).startswith(meth_name):
3428 self.fail(
3429 "Method <<{name:s}>> failed with unexpected "
3430 "exception message: {exp:s}\n".format(
3431 name=meth_name, exp=e
3432 )
3433 )
3434 # consume data
3435 s.read()
3436
3437 # read(-1, buffer) is supported, even though read(-1) is not
3438 data = b"data"
3439 s.send(data)
3440 buffer = bytearray(len(data))
3441 self.assertEqual(s.read(-1, buffer), len(data))
3442 self.assertEqual(buffer, data)
3443
Christian Heimes888bbdc2017-09-07 14:18:21 -07003444 # sendall accepts bytes-like objects
3445 if ctypes is not None:
3446 ubyte = ctypes.c_ubyte * len(data)
3447 byteslike = ubyte.from_buffer_copy(data)
3448 s.sendall(byteslike)
3449 self.assertEqual(s.read(), data)
3450
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003451 # Make sure sendmsg et al are disallowed to avoid
3452 # inadvertent disclosure of data and/or corruption
3453 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003454 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003455 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3456 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3457 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003458 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003459 s.write(b"over\n")
3460
3461 self.assertRaises(ValueError, s.recv, -1)
3462 self.assertRaises(ValueError, s.read, -1)
3463
3464 s.close()
3465
3466 def test_recv_zero(self):
3467 server = ThreadedEchoServer(CERTFILE)
3468 server.__enter__()
3469 self.addCleanup(server.__exit__, None, None)
3470 s = socket.create_connection((HOST, server.port))
3471 self.addCleanup(s.close)
3472 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3473 self.addCleanup(s.close)
3474
3475 # recv/read(0) should return no data
3476 s.send(b"data")
3477 self.assertEqual(s.recv(0), b"")
3478 self.assertEqual(s.read(0), b"")
3479 self.assertEqual(s.read(), b"data")
3480
3481 # Should not block if the other end sends no data
3482 s.setblocking(False)
3483 self.assertEqual(s.recv(0), b"")
3484 self.assertEqual(s.recv_into(bytearray()), 0)
3485
3486 def test_nonblocking_send(self):
3487 server = ThreadedEchoServer(CERTFILE,
3488 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003489 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003490 cacerts=CERTFILE,
3491 chatty=True,
3492 connectionchatty=False)
3493 with server:
3494 s = test_wrap_socket(socket.socket(),
3495 server_side=False,
3496 certfile=CERTFILE,
3497 ca_certs=CERTFILE,
3498 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003499 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003500 s.connect((HOST, server.port))
3501 s.setblocking(False)
3502
3503 # If we keep sending data, at some point the buffers
3504 # will be full and the call will block
3505 buf = bytearray(8192)
3506 def fill_buffer():
3507 while True:
3508 s.send(buf)
3509 self.assertRaises((ssl.SSLWantWriteError,
3510 ssl.SSLWantReadError), fill_buffer)
3511
3512 # Now read all the output and discard it
3513 s.setblocking(True)
3514 s.close()
3515
3516 def test_handshake_timeout(self):
3517 # Issue #5103: SSL handshake must respect the socket timeout
3518 server = socket.socket(socket.AF_INET)
3519 host = "127.0.0.1"
3520 port = support.bind_port(server)
3521 started = threading.Event()
3522 finish = False
3523
3524 def serve():
3525 server.listen()
3526 started.set()
3527 conns = []
3528 while not finish:
3529 r, w, e = select.select([server], [], [], 0.1)
3530 if server in r:
3531 # Let the socket hang around rather than having
3532 # it closed by garbage collection.
3533 conns.append(server.accept()[0])
3534 for sock in conns:
3535 sock.close()
3536
3537 t = threading.Thread(target=serve)
3538 t.start()
3539 started.wait()
3540
3541 try:
3542 try:
3543 c = socket.socket(socket.AF_INET)
3544 c.settimeout(0.2)
3545 c.connect((host, port))
3546 # Will attempt handshake and time out
3547 self.assertRaisesRegex(socket.timeout, "timed out",
3548 test_wrap_socket, c)
3549 finally:
3550 c.close()
3551 try:
3552 c = socket.socket(socket.AF_INET)
3553 c = test_wrap_socket(c)
3554 c.settimeout(0.2)
3555 # Will attempt handshake and time out
3556 self.assertRaisesRegex(socket.timeout, "timed out",
3557 c.connect, (host, port))
3558 finally:
3559 c.close()
3560 finally:
3561 finish = True
3562 t.join()
3563 server.close()
3564
3565 def test_server_accept(self):
3566 # Issue #16357: accept() on a SSLSocket created through
3567 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003568 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003569 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003570 context.load_verify_locations(SIGNING_CA)
3571 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003572 server = socket.socket(socket.AF_INET)
3573 host = "127.0.0.1"
3574 port = support.bind_port(server)
3575 server = context.wrap_socket(server, server_side=True)
3576 self.assertTrue(server.server_side)
3577
3578 evt = threading.Event()
3579 remote = None
3580 peer = None
3581 def serve():
3582 nonlocal remote, peer
3583 server.listen()
3584 # Block on the accept and wait on the connection to close.
3585 evt.set()
3586 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003587 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003588
3589 t = threading.Thread(target=serve)
3590 t.start()
3591 # Client wait until server setup and perform a connect.
3592 evt.wait()
3593 client = context.wrap_socket(socket.socket())
3594 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003595 client.send(b'data')
3596 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003597 client_addr = client.getsockname()
3598 client.close()
3599 t.join()
3600 remote.close()
3601 server.close()
3602 # Sanity checks.
3603 self.assertIsInstance(remote, ssl.SSLSocket)
3604 self.assertEqual(peer, client_addr)
3605
3606 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003607 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003608 with context.wrap_socket(socket.socket()) as sock:
3609 with self.assertRaises(OSError) as cm:
3610 sock.getpeercert()
3611 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3612
3613 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003614 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003615 with context.wrap_socket(socket.socket()) as sock:
3616 with self.assertRaises(OSError) as cm:
3617 sock.do_handshake()
3618 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3619
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003620 def test_no_shared_ciphers(self):
3621 client_context, server_context, hostname = testing_context()
3622 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3623 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003624 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003625 client_context.set_ciphers("AES128")
3626 server_context.set_ciphers("AES256")
3627 with ThreadedEchoServer(context=server_context) as server:
3628 with client_context.wrap_socket(socket.socket(),
3629 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003630 with self.assertRaises(OSError):
3631 s.connect((HOST, server.port))
3632 self.assertIn("no shared cipher", server.conn_errors[0])
3633
3634 def test_version_basic(self):
3635 """
3636 Basic tests for SSLSocket.version().
3637 More tests are done in the test_protocol_*() methods.
3638 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003639 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3640 context.check_hostname = False
3641 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003642 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003643 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003644 chatty=False) as server:
3645 with context.wrap_socket(socket.socket()) as s:
3646 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003647 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003648 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003649 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003650 self.assertEqual(s.version(), 'TLSv1.3')
3651 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003652 self.assertEqual(s.version(), 'TLSv1.2')
3653 else: # 0.9.8 to 1.0.1
3654 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003655 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003656 self.assertIs(s.version(), None)
3657
Christian Heimescb5b68a2017-09-07 18:07:00 -07003658 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3659 "test requires TLSv1.3 enabled OpenSSL")
3660 def test_tls1_3(self):
3661 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3662 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003663 context.options |= (
3664 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3665 )
3666 with ThreadedEchoServer(context=context) as server:
3667 with context.wrap_socket(socket.socket()) as s:
3668 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003669 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003670 'TLS_AES_256_GCM_SHA384',
3671 'TLS_CHACHA20_POLY1305_SHA256',
3672 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003673 })
3674 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003675
Christian Heimes698dde12018-02-27 11:54:43 +01003676 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3677 "required OpenSSL 1.1.0g")
3678 def test_min_max_version(self):
3679 client_context, server_context, hostname = testing_context()
3680 # client TLSv1.0 to 1.2
3681 client_context.minimum_version = ssl.TLSVersion.TLSv1
3682 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3683 # server only TLSv1.2
3684 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3685 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3686
3687 with ThreadedEchoServer(context=server_context) as server:
3688 with client_context.wrap_socket(socket.socket(),
3689 server_hostname=hostname) as s:
3690 s.connect((HOST, server.port))
3691 self.assertEqual(s.version(), 'TLSv1.2')
3692
3693 # client 1.0 to 1.2, server 1.0 to 1.1
3694 server_context.minimum_version = ssl.TLSVersion.TLSv1
3695 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3696
3697 with ThreadedEchoServer(context=server_context) as server:
3698 with client_context.wrap_socket(socket.socket(),
3699 server_hostname=hostname) as s:
3700 s.connect((HOST, server.port))
3701 self.assertEqual(s.version(), 'TLSv1.1')
3702
3703 # client 1.0, server 1.2 (mismatch)
3704 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3705 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3706 client_context.minimum_version = ssl.TLSVersion.TLSv1
3707 client_context.maximum_version = ssl.TLSVersion.TLSv1
3708 with ThreadedEchoServer(context=server_context) as server:
3709 with client_context.wrap_socket(socket.socket(),
3710 server_hostname=hostname) as s:
3711 with self.assertRaises(ssl.SSLError) as e:
3712 s.connect((HOST, server.port))
3713 self.assertIn("alert", str(e.exception))
3714
3715
3716 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3717 "required OpenSSL 1.1.0g")
3718 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3719 def test_min_max_version_sslv3(self):
3720 client_context, server_context, hostname = testing_context()
3721 server_context.minimum_version = ssl.TLSVersion.SSLv3
3722 client_context.minimum_version = ssl.TLSVersion.SSLv3
3723 client_context.maximum_version = ssl.TLSVersion.SSLv3
3724 with ThreadedEchoServer(context=server_context) as server:
3725 with client_context.wrap_socket(socket.socket(),
3726 server_hostname=hostname) as s:
3727 s.connect((HOST, server.port))
3728 self.assertEqual(s.version(), 'SSLv3')
3729
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003730 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3731 def test_default_ecdh_curve(self):
3732 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3733 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003734 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003735 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003736 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3737 # cipher name.
3738 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003739 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3740 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3741 # our default cipher list should prefer ECDH-based ciphers
3742 # automatically.
3743 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3744 context.set_ciphers("ECCdraft:ECDH")
3745 with ThreadedEchoServer(context=context) as server:
3746 with context.wrap_socket(socket.socket()) as s:
3747 s.connect((HOST, server.port))
3748 self.assertIn("ECDH", s.cipher()[0])
3749
3750 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3751 "'tls-unique' channel binding not available")
3752 def test_tls_unique_channel_binding(self):
3753 """Test tls-unique channel binding."""
3754 if support.verbose:
3755 sys.stdout.write("\n")
3756
Christian Heimes05d9fe32018-02-27 08:55:39 +01003757 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003758
3759 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003760 chatty=True,
3761 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003762
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003763 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003764 with client_context.wrap_socket(
3765 socket.socket(),
3766 server_hostname=hostname) as s:
3767 s.connect((HOST, server.port))
3768 # get the data
3769 cb_data = s.get_channel_binding("tls-unique")
3770 if support.verbose:
3771 sys.stdout.write(
3772 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003773
Christian Heimes05d9fe32018-02-27 08:55:39 +01003774 # check if it is sane
3775 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003776 if s.version() == 'TLSv1.3':
3777 self.assertEqual(len(cb_data), 48)
3778 else:
3779 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003780
Christian Heimes05d9fe32018-02-27 08:55:39 +01003781 # and compare with the peers version
3782 s.write(b"CB tls-unique\n")
3783 peer_data_repr = s.read().strip()
3784 self.assertEqual(peer_data_repr,
3785 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003786
3787 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003788 with client_context.wrap_socket(
3789 socket.socket(),
3790 server_hostname=hostname) as s:
3791 s.connect((HOST, server.port))
3792 new_cb_data = s.get_channel_binding("tls-unique")
3793 if support.verbose:
3794 sys.stdout.write(
3795 "got another channel binding data: {0!r}\n".format(
3796 new_cb_data)
3797 )
3798 # is it really unique
3799 self.assertNotEqual(cb_data, new_cb_data)
3800 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003801 if s.version() == 'TLSv1.3':
3802 self.assertEqual(len(cb_data), 48)
3803 else:
3804 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003805 s.write(b"CB tls-unique\n")
3806 peer_data_repr = s.read().strip()
3807 self.assertEqual(peer_data_repr,
3808 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003809
3810 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003811 client_context, server_context, hostname = testing_context()
3812 stats = server_params_test(client_context, server_context,
3813 chatty=True, connectionchatty=True,
3814 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003815 if support.verbose:
3816 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3817 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3818
3819 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3820 "ssl.OP_NO_COMPRESSION needed for this test")
3821 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003822 client_context, server_context, hostname = testing_context()
3823 client_context.options |= ssl.OP_NO_COMPRESSION
3824 server_context.options |= ssl.OP_NO_COMPRESSION
3825 stats = server_params_test(client_context, server_context,
3826 chatty=True, connectionchatty=True,
3827 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003828 self.assertIs(stats['compression'], None)
3829
3830 def test_dh_params(self):
3831 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003832 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003833 # test scenario needs TLS <= 1.2
3834 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003835 server_context.load_dh_params(DHFILE)
3836 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003837 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003838 stats = server_params_test(client_context, server_context,
3839 chatty=True, connectionchatty=True,
3840 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003841 cipher = stats["cipher"][0]
3842 parts = cipher.split("-")
3843 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3844 self.fail("Non-DH cipher: " + cipher[0])
3845
Christian Heimesb7b92252018-02-25 09:49:31 +01003846 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003847 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003848 def test_ecdh_curve(self):
3849 # server secp384r1, client auto
3850 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003851
Christian Heimesb7b92252018-02-25 09:49:31 +01003852 server_context.set_ecdh_curve("secp384r1")
3853 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3854 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3855 stats = server_params_test(client_context, server_context,
3856 chatty=True, connectionchatty=True,
3857 sni_name=hostname)
3858
3859 # server auto, client secp384r1
3860 client_context, server_context, hostname = testing_context()
3861 client_context.set_ecdh_curve("secp384r1")
3862 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3863 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3864 stats = server_params_test(client_context, server_context,
3865 chatty=True, connectionchatty=True,
3866 sni_name=hostname)
3867
3868 # server / client curve mismatch
3869 client_context, server_context, hostname = testing_context()
3870 client_context.set_ecdh_curve("prime256v1")
3871 server_context.set_ecdh_curve("secp384r1")
3872 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3873 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3874 try:
3875 stats = server_params_test(client_context, server_context,
3876 chatty=True, connectionchatty=True,
3877 sni_name=hostname)
3878 except ssl.SSLError:
3879 pass
3880 else:
3881 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003882 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003883 self.fail("mismatch curve did not fail")
3884
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003885 def test_selected_alpn_protocol(self):
3886 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003887 client_context, server_context, hostname = testing_context()
3888 stats = server_params_test(client_context, server_context,
3889 chatty=True, connectionchatty=True,
3890 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003891 self.assertIs(stats['client_alpn_protocol'], None)
3892
3893 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3894 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3895 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003896 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003897 server_context.set_alpn_protocols(['foo', 'bar'])
3898 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003899 chatty=True, connectionchatty=True,
3900 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003901 self.assertIs(stats['client_alpn_protocol'], None)
3902
3903 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3904 def test_alpn_protocols(self):
3905 server_protocols = ['foo', 'bar', 'milkshake']
3906 protocol_tests = [
3907 (['foo', 'bar'], 'foo'),
3908 (['bar', 'foo'], 'foo'),
3909 (['milkshake'], 'milkshake'),
3910 (['http/3.0', 'http/4.0'], None)
3911 ]
3912 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003913 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003914 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003915 client_context.set_alpn_protocols(client_protocols)
3916
3917 try:
3918 stats = server_params_test(client_context,
3919 server_context,
3920 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003921 connectionchatty=True,
3922 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003923 except ssl.SSLError as e:
3924 stats = e
3925
Christian Heimes05d9fe32018-02-27 08:55:39 +01003926 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3928 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3929 self.assertIsInstance(stats, ssl.SSLError)
3930 else:
3931 msg = "failed trying %s (s) and %s (c).\n" \
3932 "was expecting %s, but got %%s from the %%s" \
3933 % (str(server_protocols), str(client_protocols),
3934 str(expected))
3935 client_result = stats['client_alpn_protocol']
3936 self.assertEqual(client_result, expected,
3937 msg % (client_result, "client"))
3938 server_result = stats['server_alpn_protocols'][-1] \
3939 if len(stats['server_alpn_protocols']) else 'nothing'
3940 self.assertEqual(server_result, expected,
3941 msg % (server_result, "server"))
3942
3943 def test_selected_npn_protocol(self):
3944 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003945 client_context, server_context, hostname = testing_context()
3946 stats = server_params_test(client_context, server_context,
3947 chatty=True, connectionchatty=True,
3948 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003949 self.assertIs(stats['client_npn_protocol'], None)
3950
3951 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3952 def test_npn_protocols(self):
3953 server_protocols = ['http/1.1', 'spdy/2']
3954 protocol_tests = [
3955 (['http/1.1', 'spdy/2'], 'http/1.1'),
3956 (['spdy/2', 'http/1.1'], 'http/1.1'),
3957 (['spdy/2', 'test'], 'spdy/2'),
3958 (['abc', 'def'], 'abc')
3959 ]
3960 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003961 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003962 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003963 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003964 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003965 chatty=True, connectionchatty=True,
3966 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003967 msg = "failed trying %s (s) and %s (c).\n" \
3968 "was expecting %s, but got %%s from the %%s" \
3969 % (str(server_protocols), str(client_protocols),
3970 str(expected))
3971 client_result = stats['client_npn_protocol']
3972 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3973 server_result = stats['server_npn_protocols'][-1] \
3974 if len(stats['server_npn_protocols']) else 'nothing'
3975 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3976
3977 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003978 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003979 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003980 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003981 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003982 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003983 client_context.load_verify_locations(SIGNING_CA)
3984 return server_context, other_context, client_context
3985
3986 def check_common_name(self, stats, name):
3987 cert = stats['peercert']
3988 self.assertIn((('commonName', name),), cert['subject'])
3989
3990 @needs_sni
3991 def test_sni_callback(self):
3992 calls = []
3993 server_context, other_context, client_context = self.sni_contexts()
3994
Christian Heimesa170fa12017-09-15 20:27:30 +02003995 client_context.check_hostname = False
3996
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003997 def servername_cb(ssl_sock, server_name, initial_context):
3998 calls.append((server_name, initial_context))
3999 if server_name is not None:
4000 ssl_sock.context = other_context
4001 server_context.set_servername_callback(servername_cb)
4002
4003 stats = server_params_test(client_context, server_context,
4004 chatty=True,
4005 sni_name='supermessage')
4006 # The hostname was fetched properly, and the certificate was
4007 # changed for the connection.
4008 self.assertEqual(calls, [("supermessage", server_context)])
4009 # CERTFILE4 was selected
4010 self.check_common_name(stats, 'fakehostname')
4011
4012 calls = []
4013 # The callback is called with server_name=None
4014 stats = server_params_test(client_context, server_context,
4015 chatty=True,
4016 sni_name=None)
4017 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004018 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004019
4020 # Check disabling the callback
4021 calls = []
4022 server_context.set_servername_callback(None)
4023
4024 stats = server_params_test(client_context, server_context,
4025 chatty=True,
4026 sni_name='notfunny')
4027 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004028 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004029 self.assertEqual(calls, [])
4030
4031 @needs_sni
4032 def test_sni_callback_alert(self):
4033 # Returning a TLS alert is reflected to the connecting client
4034 server_context, other_context, client_context = self.sni_contexts()
4035
4036 def cb_returning_alert(ssl_sock, server_name, initial_context):
4037 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4038 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004039 with self.assertRaises(ssl.SSLError) as cm:
4040 stats = server_params_test(client_context, server_context,
4041 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004042 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004043 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004044
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004045 @needs_sni
4046 def test_sni_callback_raising(self):
4047 # Raising fails the connection with a TLS handshake failure alert.
4048 server_context, other_context, client_context = self.sni_contexts()
4049
4050 def cb_raising(ssl_sock, server_name, initial_context):
4051 1/0
4052 server_context.set_servername_callback(cb_raising)
4053
Victor Stinner00253502019-06-03 03:51:43 +02004054 with support.catch_unraisable_exception() as catch:
4055 with self.assertRaises(ssl.SSLError) as cm:
4056 stats = server_params_test(client_context, server_context,
4057 chatty=False,
4058 sni_name='supermessage')
4059
4060 self.assertEqual(cm.exception.reason,
4061 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4062 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004063
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004064 @needs_sni
4065 def test_sni_callback_wrong_return_type(self):
4066 # Returning the wrong return type terminates the TLS connection
4067 # with an internal error alert.
4068 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004069
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004070 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4071 return "foo"
4072 server_context.set_servername_callback(cb_wrong_return_type)
4073
Victor Stinner00253502019-06-03 03:51:43 +02004074 with support.catch_unraisable_exception() as catch:
4075 with self.assertRaises(ssl.SSLError) as cm:
4076 stats = server_params_test(client_context, server_context,
4077 chatty=False,
4078 sni_name='supermessage')
4079
4080
4081 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4082 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004083
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004084 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004085 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004086 client_context.set_ciphers("AES128:AES256")
4087 server_context.set_ciphers("AES256")
4088 expected_algs = [
4089 "AES256", "AES-256",
4090 # TLS 1.3 ciphers are always enabled
4091 "TLS_CHACHA20", "TLS_AES",
4092 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004093
Christian Heimesa170fa12017-09-15 20:27:30 +02004094 stats = server_params_test(client_context, server_context,
4095 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 ciphers = stats['server_shared_ciphers'][0]
4097 self.assertGreater(len(ciphers), 0)
4098 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004099 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004100 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004101
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004102 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004103 client_context, server_context, hostname = testing_context()
4104 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004106 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004107 s = client_context.wrap_socket(socket.socket(),
4108 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004109 s.connect((HOST, server.port))
4110 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004111
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004112 self.assertRaises(ValueError, s.read, 1024)
4113 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004114
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004115 def test_sendfile(self):
4116 TEST_DATA = b"x" * 512
4117 with open(support.TESTFN, 'wb') as f:
4118 f.write(TEST_DATA)
4119 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004120 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004121 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004122 context.load_verify_locations(SIGNING_CA)
4123 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004124 server = ThreadedEchoServer(context=context, chatty=False)
4125 with server:
4126 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004127 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004128 with open(support.TESTFN, 'rb') as file:
4129 s.sendfile(file)
4130 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004131
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004132 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004133 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004134 # TODO: sessions aren't compatible with TLSv1.3 yet
4135 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004136
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004137 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004138 stats = server_params_test(client_context, server_context,
4139 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004140 session = stats['session']
4141 self.assertTrue(session.id)
4142 self.assertGreater(session.time, 0)
4143 self.assertGreater(session.timeout, 0)
4144 self.assertTrue(session.has_ticket)
4145 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4146 self.assertGreater(session.ticket_lifetime_hint, 0)
4147 self.assertFalse(stats['session_reused'])
4148 sess_stat = server_context.session_stats()
4149 self.assertEqual(sess_stat['accept'], 1)
4150 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004151
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004152 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004153 stats = server_params_test(client_context, server_context,
4154 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004155 sess_stat = server_context.session_stats()
4156 self.assertEqual(sess_stat['accept'], 2)
4157 self.assertEqual(sess_stat['hits'], 1)
4158 self.assertTrue(stats['session_reused'])
4159 session2 = stats['session']
4160 self.assertEqual(session2.id, session.id)
4161 self.assertEqual(session2, session)
4162 self.assertIsNot(session2, session)
4163 self.assertGreaterEqual(session2.time, session.time)
4164 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004165
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004166 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004167 stats = server_params_test(client_context, server_context,
4168 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004169 self.assertFalse(stats['session_reused'])
4170 session3 = stats['session']
4171 self.assertNotEqual(session3.id, session.id)
4172 self.assertNotEqual(session3, session)
4173 sess_stat = server_context.session_stats()
4174 self.assertEqual(sess_stat['accept'], 3)
4175 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004176
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004177 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004178 stats = server_params_test(client_context, server_context,
4179 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004180 self.assertTrue(stats['session_reused'])
4181 session4 = stats['session']
4182 self.assertEqual(session4.id, session.id)
4183 self.assertEqual(session4, session)
4184 self.assertGreaterEqual(session4.time, session.time)
4185 self.assertGreaterEqual(session4.timeout, session.timeout)
4186 sess_stat = server_context.session_stats()
4187 self.assertEqual(sess_stat['accept'], 4)
4188 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004189
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004190 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004191 client_context, server_context, hostname = testing_context()
4192 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004193
Christian Heimes05d9fe32018-02-27 08:55:39 +01004194 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004195 client_context.options |= ssl.OP_NO_TLSv1_3
4196 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004197
Christian Heimesa170fa12017-09-15 20:27:30 +02004198 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004199 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004200 with client_context.wrap_socket(socket.socket(),
4201 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004202 # session is None before handshake
4203 self.assertEqual(s.session, None)
4204 self.assertEqual(s.session_reused, None)
4205 s.connect((HOST, server.port))
4206 session = s.session
4207 self.assertTrue(session)
4208 with self.assertRaises(TypeError) as e:
4209 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004210 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004211
Christian Heimesa170fa12017-09-15 20:27:30 +02004212 with client_context.wrap_socket(socket.socket(),
4213 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004214 s.connect((HOST, server.port))
4215 # cannot set session after handshake
4216 with self.assertRaises(ValueError) as e:
4217 s.session = session
4218 self.assertEqual(str(e.exception),
4219 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004220
Christian Heimesa170fa12017-09-15 20:27:30 +02004221 with client_context.wrap_socket(socket.socket(),
4222 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004223 # can set session before handshake and before the
4224 # connection was established
4225 s.session = session
4226 s.connect((HOST, server.port))
4227 self.assertEqual(s.session.id, session.id)
4228 self.assertEqual(s.session, session)
4229 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004230
Christian Heimesa170fa12017-09-15 20:27:30 +02004231 with client_context2.wrap_socket(socket.socket(),
4232 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004233 # cannot re-use session with a different SSLContext
4234 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004235 s.session = session
4236 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004237 self.assertEqual(str(e.exception),
4238 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004239
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004240
Christian Heimes9fb051f2018-09-23 08:32:31 +02004241@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4242class TestPostHandshakeAuth(unittest.TestCase):
4243 def test_pha_setter(self):
4244 protocols = [
4245 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4246 ]
4247 for protocol in protocols:
4248 ctx = ssl.SSLContext(protocol)
4249 self.assertEqual(ctx.post_handshake_auth, False)
4250
4251 ctx.post_handshake_auth = True
4252 self.assertEqual(ctx.post_handshake_auth, True)
4253
4254 ctx.verify_mode = ssl.CERT_REQUIRED
4255 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4256 self.assertEqual(ctx.post_handshake_auth, True)
4257
4258 ctx.post_handshake_auth = False
4259 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4260 self.assertEqual(ctx.post_handshake_auth, False)
4261
4262 ctx.verify_mode = ssl.CERT_OPTIONAL
4263 ctx.post_handshake_auth = True
4264 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4265 self.assertEqual(ctx.post_handshake_auth, True)
4266
4267 def test_pha_required(self):
4268 client_context, server_context, hostname = testing_context()
4269 server_context.post_handshake_auth = True
4270 server_context.verify_mode = ssl.CERT_REQUIRED
4271 client_context.post_handshake_auth = True
4272 client_context.load_cert_chain(SIGNED_CERTFILE)
4273
4274 server = ThreadedEchoServer(context=server_context, chatty=False)
4275 with server:
4276 with client_context.wrap_socket(socket.socket(),
4277 server_hostname=hostname) as s:
4278 s.connect((HOST, server.port))
4279 s.write(b'HASCERT')
4280 self.assertEqual(s.recv(1024), b'FALSE\n')
4281 s.write(b'PHA')
4282 self.assertEqual(s.recv(1024), b'OK\n')
4283 s.write(b'HASCERT')
4284 self.assertEqual(s.recv(1024), b'TRUE\n')
4285 # PHA method just returns true when cert is already available
4286 s.write(b'PHA')
4287 self.assertEqual(s.recv(1024), b'OK\n')
4288 s.write(b'GETCERT')
4289 cert_text = s.recv(4096).decode('us-ascii')
4290 self.assertIn('Python Software Foundation CA', cert_text)
4291
4292 def test_pha_required_nocert(self):
4293 client_context, server_context, hostname = testing_context()
4294 server_context.post_handshake_auth = True
4295 server_context.verify_mode = ssl.CERT_REQUIRED
4296 client_context.post_handshake_auth = True
4297
4298 server = ThreadedEchoServer(context=server_context, chatty=False)
4299 with server:
4300 with client_context.wrap_socket(socket.socket(),
4301 server_hostname=hostname) as s:
4302 s.connect((HOST, server.port))
4303 s.write(b'PHA')
4304 # receive CertificateRequest
4305 self.assertEqual(s.recv(1024), b'OK\n')
4306 # send empty Certificate + Finish
4307 s.write(b'HASCERT')
4308 # receive alert
4309 with self.assertRaisesRegex(
4310 ssl.SSLError,
4311 'tlsv13 alert certificate required'):
4312 s.recv(1024)
4313
4314 def test_pha_optional(self):
4315 if support.verbose:
4316 sys.stdout.write("\n")
4317
4318 client_context, server_context, hostname = testing_context()
4319 server_context.post_handshake_auth = True
4320 server_context.verify_mode = ssl.CERT_REQUIRED
4321 client_context.post_handshake_auth = True
4322 client_context.load_cert_chain(SIGNED_CERTFILE)
4323
4324 # check CERT_OPTIONAL
4325 server_context.verify_mode = ssl.CERT_OPTIONAL
4326 server = ThreadedEchoServer(context=server_context, chatty=False)
4327 with server:
4328 with client_context.wrap_socket(socket.socket(),
4329 server_hostname=hostname) as s:
4330 s.connect((HOST, server.port))
4331 s.write(b'HASCERT')
4332 self.assertEqual(s.recv(1024), b'FALSE\n')
4333 s.write(b'PHA')
4334 self.assertEqual(s.recv(1024), b'OK\n')
4335 s.write(b'HASCERT')
4336 self.assertEqual(s.recv(1024), b'TRUE\n')
4337
4338 def test_pha_optional_nocert(self):
4339 if support.verbose:
4340 sys.stdout.write("\n")
4341
4342 client_context, server_context, hostname = testing_context()
4343 server_context.post_handshake_auth = True
4344 server_context.verify_mode = ssl.CERT_OPTIONAL
4345 client_context.post_handshake_auth = True
4346
4347 server = ThreadedEchoServer(context=server_context, chatty=False)
4348 with server:
4349 with client_context.wrap_socket(socket.socket(),
4350 server_hostname=hostname) as s:
4351 s.connect((HOST, server.port))
4352 s.write(b'HASCERT')
4353 self.assertEqual(s.recv(1024), b'FALSE\n')
4354 s.write(b'PHA')
4355 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004356 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004357 s.write(b'HASCERT')
4358 self.assertEqual(s.recv(1024), b'FALSE\n')
4359
4360 def test_pha_no_pha_client(self):
4361 client_context, server_context, hostname = testing_context()
4362 server_context.post_handshake_auth = True
4363 server_context.verify_mode = ssl.CERT_REQUIRED
4364 client_context.load_cert_chain(SIGNED_CERTFILE)
4365
4366 server = ThreadedEchoServer(context=server_context, chatty=False)
4367 with server:
4368 with client_context.wrap_socket(socket.socket(),
4369 server_hostname=hostname) as s:
4370 s.connect((HOST, server.port))
4371 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4372 s.verify_client_post_handshake()
4373 s.write(b'PHA')
4374 self.assertIn(b'extension not received', s.recv(1024))
4375
4376 def test_pha_no_pha_server(self):
4377 # server doesn't have PHA enabled, cert is requested in handshake
4378 client_context, server_context, hostname = testing_context()
4379 server_context.verify_mode = ssl.CERT_REQUIRED
4380 client_context.post_handshake_auth = True
4381 client_context.load_cert_chain(SIGNED_CERTFILE)
4382
4383 server = ThreadedEchoServer(context=server_context, chatty=False)
4384 with server:
4385 with client_context.wrap_socket(socket.socket(),
4386 server_hostname=hostname) as s:
4387 s.connect((HOST, server.port))
4388 s.write(b'HASCERT')
4389 self.assertEqual(s.recv(1024), b'TRUE\n')
4390 # PHA doesn't fail if there is already a cert
4391 s.write(b'PHA')
4392 self.assertEqual(s.recv(1024), b'OK\n')
4393 s.write(b'HASCERT')
4394 self.assertEqual(s.recv(1024), b'TRUE\n')
4395
4396 def test_pha_not_tls13(self):
4397 # TLS 1.2
4398 client_context, server_context, hostname = testing_context()
4399 server_context.verify_mode = ssl.CERT_REQUIRED
4400 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4401 client_context.post_handshake_auth = True
4402 client_context.load_cert_chain(SIGNED_CERTFILE)
4403
4404 server = ThreadedEchoServer(context=server_context, chatty=False)
4405 with server:
4406 with client_context.wrap_socket(socket.socket(),
4407 server_hostname=hostname) as s:
4408 s.connect((HOST, server.port))
4409 # PHA fails for TLS != 1.3
4410 s.write(b'PHA')
4411 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4412
4413
Christian Heimesc7f70692019-05-31 11:44:05 +02004414HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4415requires_keylog = unittest.skipUnless(
4416 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4417
4418class TestSSLDebug(unittest.TestCase):
4419
4420 def keylog_lines(self, fname=support.TESTFN):
4421 with open(fname) as f:
4422 return len(list(f))
4423
4424 @requires_keylog
4425 def test_keylog_defaults(self):
4426 self.addCleanup(support.unlink, support.TESTFN)
4427 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4428 self.assertEqual(ctx.keylog_filename, None)
4429
4430 self.assertFalse(os.path.isfile(support.TESTFN))
4431 ctx.keylog_filename = support.TESTFN
4432 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4433 self.assertTrue(os.path.isfile(support.TESTFN))
4434 self.assertEqual(self.keylog_lines(), 1)
4435
4436 ctx.keylog_filename = None
4437 self.assertEqual(ctx.keylog_filename, None)
4438
4439 with self.assertRaises((IsADirectoryError, PermissionError)):
4440 # Windows raises PermissionError
4441 ctx.keylog_filename = os.path.dirname(
4442 os.path.abspath(support.TESTFN))
4443
4444 with self.assertRaises(TypeError):
4445 ctx.keylog_filename = 1
4446
4447 @requires_keylog
4448 def test_keylog_filename(self):
4449 self.addCleanup(support.unlink, support.TESTFN)
4450 client_context, server_context, hostname = testing_context()
4451
4452 client_context.keylog_filename = support.TESTFN
4453 server = ThreadedEchoServer(context=server_context, chatty=False)
4454 with server:
4455 with client_context.wrap_socket(socket.socket(),
4456 server_hostname=hostname) as s:
4457 s.connect((HOST, server.port))
4458 # header, 5 lines for TLS 1.3
4459 self.assertEqual(self.keylog_lines(), 6)
4460
4461 client_context.keylog_filename = None
4462 server_context.keylog_filename = support.TESTFN
4463 server = ThreadedEchoServer(context=server_context, chatty=False)
4464 with server:
4465 with client_context.wrap_socket(socket.socket(),
4466 server_hostname=hostname) as s:
4467 s.connect((HOST, server.port))
4468 self.assertGreaterEqual(self.keylog_lines(), 11)
4469
4470 client_context.keylog_filename = support.TESTFN
4471 server_context.keylog_filename = support.TESTFN
4472 server = ThreadedEchoServer(context=server_context, chatty=False)
4473 with server:
4474 with client_context.wrap_socket(socket.socket(),
4475 server_hostname=hostname) as s:
4476 s.connect((HOST, server.port))
4477 self.assertGreaterEqual(self.keylog_lines(), 21)
4478
4479 client_context.keylog_filename = None
4480 server_context.keylog_filename = None
4481
4482 @requires_keylog
4483 @unittest.skipIf(sys.flags.ignore_environment,
4484 "test is not compatible with ignore_environment")
4485 def test_keylog_env(self):
4486 self.addCleanup(support.unlink, support.TESTFN)
4487 with unittest.mock.patch.dict(os.environ):
4488 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4489 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4490
4491 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4492 self.assertEqual(ctx.keylog_filename, None)
4493
4494 ctx = ssl.create_default_context()
4495 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4496
4497 ctx = ssl._create_stdlib_context()
4498 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4499
4500 def test_msg_callback(self):
4501 client_context, server_context, hostname = testing_context()
4502
4503 def msg_cb(conn, direction, version, content_type, msg_type, data):
4504 pass
4505
4506 self.assertIs(client_context._msg_callback, None)
4507 client_context._msg_callback = msg_cb
4508 self.assertIs(client_context._msg_callback, msg_cb)
4509 with self.assertRaises(TypeError):
4510 client_context._msg_callback = object()
4511
4512 def test_msg_callback_tls12(self):
4513 client_context, server_context, hostname = testing_context()
4514 client_context.options |= ssl.OP_NO_TLSv1_3
4515
4516 msg = []
4517
4518 def msg_cb(conn, direction, version, content_type, msg_type, data):
4519 self.assertIsInstance(conn, ssl.SSLSocket)
4520 self.assertIsInstance(data, bytes)
4521 self.assertIn(direction, {'read', 'write'})
4522 msg.append((direction, version, content_type, msg_type))
4523
4524 client_context._msg_callback = msg_cb
4525
4526 server = ThreadedEchoServer(context=server_context, chatty=False)
4527 with server:
4528 with client_context.wrap_socket(socket.socket(),
4529 server_hostname=hostname) as s:
4530 s.connect((HOST, server.port))
4531
4532 self.assertEqual(msg, [
4533 ("write", TLSVersion.TLSv1, _TLSContentType.HEADER,
4534 _TLSMessageType.CERTIFICATE_STATUS),
4535 ("write", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4536 _TLSMessageType.CLIENT_HELLO),
4537 ("read", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4538 _TLSMessageType.CERTIFICATE_STATUS),
4539 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4540 _TLSMessageType.SERVER_HELLO),
4541 ("read", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4542 _TLSMessageType.CERTIFICATE_STATUS),
4543 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4544 _TLSMessageType.CERTIFICATE),
4545 ("read", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4546 _TLSMessageType.CERTIFICATE_STATUS),
4547 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4548 _TLSMessageType.SERVER_KEY_EXCHANGE),
4549 ("read", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4550 _TLSMessageType.CERTIFICATE_STATUS),
4551 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4552 _TLSMessageType.SERVER_DONE),
4553 ("write", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4554 _TLSMessageType.CERTIFICATE_STATUS),
4555 ("write", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4556 _TLSMessageType.CLIENT_KEY_EXCHANGE),
4557 ("write", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4558 _TLSMessageType.FINISHED),
4559 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4560 _TLSMessageType.CHANGE_CIPHER_SPEC),
4561 ("write", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4562 _TLSMessageType.CERTIFICATE_STATUS),
4563 ("write", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4564 _TLSMessageType.FINISHED),
4565 ("read", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4566 _TLSMessageType.CERTIFICATE_STATUS),
4567 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4568 _TLSMessageType.NEWSESSION_TICKET),
4569 ("read", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4570 _TLSMessageType.FINISHED),
4571 ("read", TLSVersion.TLSv1_2, _TLSContentType.HEADER,
4572 _TLSMessageType.CERTIFICATE_STATUS),
4573 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4574 _TLSMessageType.FINISHED),
4575 ])
4576
4577
Thomas Woutersed03b412007-08-28 21:37:11 +00004578def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004579 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004580 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004581 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004582 'Mac': platform.mac_ver,
4583 'Windows': platform.win32_ver,
4584 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004585 for name, func in plats.items():
4586 plat = func()
4587 if plat and plat[0]:
4588 plat = '%s %r' % (name, plat)
4589 break
4590 else:
4591 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004592 print("test_ssl: testing with %r %r" %
4593 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4594 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004595 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004596 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4597 try:
4598 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4599 except AttributeError:
4600 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004601
Antoine Pitrou152efa22010-05-16 18:19:27 +00004602 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004603 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004604 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004605 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004606 BADCERT, BADKEY, EMPTYCERT]:
4607 if not os.path.exists(filename):
4608 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004609
Martin Panter3840b2a2016-03-27 01:53:46 +00004610 tests = [
4611 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004612 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004613 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004614 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004615
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004616 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004617 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004618
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004619 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004620 try:
4621 support.run_unittest(*tests)
4622 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004623 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004624
4625if __name__ == "__main__":
4626 test_main()