blob: 7ba8156eef5da8a640b6a121937f409ff86436ab [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Christian Heimesc7f70692019-05-31 11:44:05 +020029from ssl import TLSVersion, _TLSContentType, _TLSMessageType, _TLSAlertType
Martin Panter3840b2a2016-03-27 01:53:46 +000030
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020033IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010034IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
35IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010036PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000037
Victor Stinner3ef63442019-02-19 18:06:03 +010038PROTOCOL_TO_TLS_VERSION = {}
39for proto, ver in (
40 ("PROTOCOL_SSLv23", "SSLv3"),
41 ("PROTOCOL_TLSv1", "TLSv1"),
42 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
43):
44 try:
45 proto = getattr(ssl, proto)
46 ver = getattr(ssl.TLSVersion, ver)
47 except AttributeError:
48 continue
49 PROTOCOL_TO_TLS_VERSION[proto] = ver
50
Christian Heimesefff7062013-11-21 03:35:02 +010051def data_file(*name):
52 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000053
Antoine Pitrou81564092010-10-08 23:06:24 +000054# The custom key and certificate files used in test_ssl are generated
55# using Lib/test/make_ssl_certs.py.
56# Other certificates are simply fetched from the Internet servers they
57# are meant to authenticate.
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000060BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000061ONLYCERT = data_file("ssl_cert.pem")
62ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000063BYTES_ONLYCERT = os.fsencode(ONLYCERT)
64BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020065CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
66ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
67KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000068CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000069BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010070CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
71CAFILE_CACERT = data_file("capath", "5ed36f99.0")
72
Christian Heimesbd5c7d22018-01-20 15:16:30 +010073CERTFILE_INFO = {
74 'issuer': ((('countryName', 'XY'),),
75 (('localityName', 'Castle Anthrax'),),
76 (('organizationName', 'Python Software Foundation'),),
77 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020078 'notAfter': 'Aug 26 14:23:15 2028 GMT',
79 'notBefore': 'Aug 29 14:23:15 2018 GMT',
80 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081 'subject': ((('countryName', 'XY'),),
82 (('localityName', 'Castle Anthrax'),),
83 (('organizationName', 'Python Software Foundation'),),
84 (('commonName', 'localhost'),)),
85 'subjectAltName': (('DNS', 'localhost'),),
86 'version': 3
87}
Antoine Pitrou152efa22010-05-16 18:19:27 +000088
Christian Heimes22587792013-11-21 23:56:13 +010089# empty CRL
90CRLFILE = data_file("revocation.crl")
91
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010092# Two keys and certs signed by the same CA (for SNI tests)
93SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020094SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010095
96SIGNED_CERTFILE_INFO = {
97 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
98 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
99 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
100 'issuer': ((('countryName', 'XY'),),
101 (('organizationName', 'Python Software Foundation CA'),),
102 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200103 'notAfter': 'Jul 7 14:23:16 2028 GMT',
104 'notBefore': 'Aug 29 14:23:16 2018 GMT',
105 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100106 'subject': ((('countryName', 'XY'),),
107 (('localityName', 'Castle Anthrax'),),
108 (('organizationName', 'Python Software Foundation'),),
109 (('commonName', 'localhost'),)),
110 'subjectAltName': (('DNS', 'localhost'),),
111 'version': 3
112}
113
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100114SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200115SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100116SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
117SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
118
Martin Panter3840b2a2016-03-27 01:53:46 +0000119# Same certificate as pycacert.pem, but without extra text in file
120SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200121# cert with all kinds of subject alt names
122ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100123IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100124
Martin Panter3d81d932016-01-14 09:36:00 +0000125REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000126
127EMPTYCERT = data_file("nullcert.pem")
128BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000129NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000130BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200131NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200132NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100133TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000134
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200135DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100136BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000137
Christian Heimes358cfd42016-09-10 22:43:48 +0200138# Not defined in all versions of OpenSSL
139OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
140OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
141OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
142OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100143OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200144
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100145
Thomas Woutersed03b412007-08-28 21:37:11 +0000146def handle_error(prefix):
147 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000148 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000149 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000150
Antoine Pitroub5218772010-05-21 09:56:06 +0000151def can_clear_options():
152 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200153 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000154
155def no_sslv2_implies_sslv3_hello():
156 # 0.9.7h or higher
157 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
158
Christian Heimes2427b502013-11-23 11:24:32 +0100159def have_verify_flags():
160 # 0.9.8 or higher
161 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
162
Christian Heimesb7b92252018-02-25 09:49:31 +0100163def _have_secp_curves():
164 if not ssl.HAS_ECDH:
165 return False
166 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
167 try:
168 ctx.set_ecdh_curve("secp384r1")
169 except ValueError:
170 return False
171 else:
172 return True
173
174
175HAVE_SECP_CURVES = _have_secp_curves()
176
177
Antoine Pitrouc695c952014-04-28 20:57:36 +0200178def utc_offset(): #NOTE: ignore issues like #1647654
179 # local time = utc time + utc offset
180 if time.daylight and time.localtime().tm_isdst > 0:
181 return -time.altzone # seconds
182 return -time.timezone
183
Christian Heimes9424bb42013-06-17 15:32:57 +0200184def asn1time(cert_time):
185 # Some versions of OpenSSL ignore seconds, see #18207
186 # 0.9.8.i
187 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
188 fmt = "%b %d %H:%M:%S %Y GMT"
189 dt = datetime.datetime.strptime(cert_time, fmt)
190 dt = dt.replace(second=0)
191 cert_time = dt.strftime(fmt)
192 # %d adds leading zero but ASN1_TIME_print() uses leading space
193 if cert_time[4] == "0":
194 cert_time = cert_time[:4] + " " + cert_time[5:]
195
196 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000197
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100198needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
199
Antoine Pitrou23df4832010-08-04 17:14:06 +0000200
Christian Heimesd0486372016-09-10 23:23:33 +0200201def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
202 cert_reqs=ssl.CERT_NONE, ca_certs=None,
203 ciphers=None, certfile=None, keyfile=None,
204 **kwargs):
205 context = ssl.SSLContext(ssl_version)
206 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200207 if cert_reqs == ssl.CERT_NONE:
208 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200209 context.verify_mode = cert_reqs
210 if ca_certs is not None:
211 context.load_verify_locations(ca_certs)
212 if certfile is not None or keyfile is not None:
213 context.load_cert_chain(certfile, keyfile)
214 if ciphers is not None:
215 context.set_ciphers(ciphers)
216 return context.wrap_socket(sock, **kwargs)
217
Christian Heimesa170fa12017-09-15 20:27:30 +0200218
219def testing_context(server_cert=SIGNED_CERTFILE):
220 """Create context
221
222 client_context, server_context, hostname = testing_context()
223 """
224 if server_cert == SIGNED_CERTFILE:
225 hostname = SIGNED_CERTFILE_HOSTNAME
226 elif server_cert == SIGNED_CERTFILE2:
227 hostname = SIGNED_CERTFILE2_HOSTNAME
228 else:
229 raise ValueError(server_cert)
230
231 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
232 client_context.load_verify_locations(SIGNING_CA)
233
234 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
235 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200236 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200237
238 return client_context, server_context, hostname
239
240
Antoine Pitrou152efa22010-05-16 18:19:27 +0000241class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000242
Antoine Pitrou480a1242010-04-28 21:37:09 +0000243 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000244 ssl.CERT_NONE
245 ssl.CERT_OPTIONAL
246 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100247 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100248 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100249 if ssl.HAS_ECDH:
250 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100251 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
252 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000253 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100254 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700255 ssl.OP_NO_SSLv2
256 ssl.OP_NO_SSLv3
257 ssl.OP_NO_TLSv1
258 ssl.OP_NO_TLSv1_3
259 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
260 ssl.OP_NO_TLSv1_1
261 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200262 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000263
Christian Heimes9d50ab52018-02-27 10:17:30 +0100264 def test_private_init(self):
265 with self.assertRaisesRegex(TypeError, "public constructor"):
266 with socket.socket() as s:
267 ssl.SSLSocket(s)
268
Antoine Pitrou172f0252014-04-18 20:33:08 +0200269 def test_str_for_enums(self):
270 # Make sure that the PROTOCOL_* constants have enum-like string
271 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200272 proto = ssl.PROTOCOL_TLS
273 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200274 ctx = ssl.SSLContext(proto)
275 self.assertIs(ctx.protocol, proto)
276
Antoine Pitrou480a1242010-04-28 21:37:09 +0000277 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000279 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 sys.stdout.write("\n RAND_status is %d (%s)\n"
281 % (v, (v and "sufficient randomness") or
282 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200283
284 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
285 self.assertEqual(len(data), 16)
286 self.assertEqual(is_cryptographic, v == 1)
287 if v:
288 data = ssl.RAND_bytes(16)
289 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200290 else:
291 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200292
Victor Stinner1e81a392013-12-19 16:47:04 +0100293 # negative num is invalid
294 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
295 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
296
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100297 if hasattr(ssl, 'RAND_egd'):
298 self.assertRaises(TypeError, ssl.RAND_egd, 1)
299 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000300 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200301 ssl.RAND_add(b"this is a random bytes object", 75.0)
302 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000303
Christian Heimesf77b4b22013-08-21 13:26:05 +0200304 @unittest.skipUnless(os.name == 'posix', 'requires posix')
305 def test_random_fork(self):
306 status = ssl.RAND_status()
307 if not status:
308 self.fail("OpenSSL's PRNG has insufficient randomness")
309
310 rfd, wfd = os.pipe()
311 pid = os.fork()
312 if pid == 0:
313 try:
314 os.close(rfd)
315 child_random = ssl.RAND_pseudo_bytes(16)[0]
316 self.assertEqual(len(child_random), 16)
317 os.write(wfd, child_random)
318 os.close(wfd)
319 except BaseException:
320 os._exit(1)
321 else:
322 os._exit(0)
323 else:
324 os.close(wfd)
325 self.addCleanup(os.close, rfd)
326 _, status = os.waitpid(pid, 0)
327 self.assertEqual(status, 0)
328
329 child_random = os.read(rfd, 16)
330 self.assertEqual(len(child_random), 16)
331 parent_random = ssl.RAND_pseudo_bytes(16)[0]
332 self.assertEqual(len(parent_random), 16)
333
334 self.assertNotEqual(child_random, parent_random)
335
Christian Heimese6dac002018-08-30 07:25:49 +0200336 maxDiff = None
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimesa37f5242019-01-15 23:47:42 +0100367 def test_parse_cert_CVE_2019_5010(self):
368 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 self.assertEqual(
372 p,
373 {
374 'issuer': (
375 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
376 'notAfter': 'Jun 14 18:00:58 2028 GMT',
377 'notBefore': 'Jun 18 18:00:58 2018 GMT',
378 'serialNumber': '02',
379 'subject': ((('countryName', 'UK'),),
380 (('commonName',
381 'codenomicon-vm-2.test.lal.cisco.com'),)),
382 'subjectAltName': (
383 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
384 'version': 3
385 }
386 )
387
Christian Heimes824f7f32013-08-17 00:54:47 +0200388 def test_parse_cert_CVE_2013_4238(self):
389 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
390 if support.verbose:
391 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
392 subject = ((('countryName', 'US'),),
393 (('stateOrProvinceName', 'Oregon'),),
394 (('localityName', 'Beaverton'),),
395 (('organizationName', 'Python Software Foundation'),),
396 (('organizationalUnitName', 'Python Core Development'),),
397 (('commonName', 'null.python.org\x00example.org'),),
398 (('emailAddress', 'python-dev@python.org'),))
399 self.assertEqual(p['subject'], subject)
400 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200401 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
402 san = (('DNS', 'altnull.python.org\x00example.com'),
403 ('email', 'null@python.org\x00user@example.org'),
404 ('URI', 'http://null.python.org\x00http://example.org'),
405 ('IP Address', '192.0.2.1'),
406 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
407 else:
408 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
409 san = (('DNS', 'altnull.python.org\x00example.com'),
410 ('email', 'null@python.org\x00user@example.org'),
411 ('URI', 'http://null.python.org\x00http://example.org'),
412 ('IP Address', '192.0.2.1'),
413 ('IP Address', '<invalid>'))
414
415 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200416
Christian Heimes1c03abd2016-09-06 23:25:35 +0200417 def test_parse_all_sans(self):
418 p = ssl._ssl._test_decode_cert(ALLSANFILE)
419 self.assertEqual(p['subjectAltName'],
420 (
421 ('DNS', 'allsans'),
422 ('othername', '<unsupported>'),
423 ('othername', '<unsupported>'),
424 ('email', 'user@example.org'),
425 ('DNS', 'www.example.org'),
426 ('DirName',
427 ((('countryName', 'XY'),),
428 (('localityName', 'Castle Anthrax'),),
429 (('organizationName', 'Python Software Foundation'),),
430 (('commonName', 'dirname example'),))),
431 ('URI', 'https://www.python.org/'),
432 ('IP Address', '127.0.0.1'),
433 ('IP Address', '0:0:0:0:0:0:0:1\n'),
434 ('Registered ID', '1.2.3.4.5')
435 )
436 )
437
Antoine Pitrou480a1242010-04-28 21:37:09 +0000438 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000439 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000440 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000441 d1 = ssl.PEM_cert_to_DER_cert(pem)
442 p2 = ssl.DER_cert_to_PEM_cert(d1)
443 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000444 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000445 if not p2.startswith(ssl.PEM_HEADER + '\n'):
446 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
447 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
448 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000449
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 def test_openssl_version(self):
451 n = ssl.OPENSSL_VERSION_NUMBER
452 t = ssl.OPENSSL_VERSION_INFO
453 s = ssl.OPENSSL_VERSION
454 self.assertIsInstance(n, int)
455 self.assertIsInstance(t, tuple)
456 self.assertIsInstance(s, str)
457 # Some sanity checks follow
458 # >= 0.9
459 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400460 # < 3.0
461 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000462 major, minor, fix, patch, status = t
463 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400464 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000465 self.assertGreaterEqual(minor, 0)
466 self.assertLess(minor, 256)
467 self.assertGreaterEqual(fix, 0)
468 self.assertLess(fix, 256)
469 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100470 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000471 self.assertGreaterEqual(status, 0)
472 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400473 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200474 if IS_LIBRESSL:
475 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100476 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400477 else:
478 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100479 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000480
Antoine Pitrou9d543662010-04-23 23:10:32 +0000481 @support.cpython_only
482 def test_refcycle(self):
483 # Issue #7943: an SSL object doesn't create reference cycles with
484 # itself.
485 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200486 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000487 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100488 with support.check_warnings(("", ResourceWarning)):
489 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100490 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000491
Antoine Pitroua468adc2010-09-14 14:43:44 +0000492 def test_wrapped_unconnected(self):
493 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200494 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000495 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200496 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100497 self.assertRaises(OSError, ss.recv, 1)
498 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
499 self.assertRaises(OSError, ss.recvfrom, 1)
500 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
501 self.assertRaises(OSError, ss.send, b'x')
502 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200503 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100504 self.assertRaises(NotImplementedError, ss.sendmsg,
505 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200506 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
507 self.assertRaises(NotImplementedError, ss.recvmsg_into,
508 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000509
Antoine Pitrou40f08742010-04-24 22:04:40 +0000510 def test_timeout(self):
511 # Issue #8524: when creating an SSL socket, the timeout of the
512 # original socket should be retained.
513 for timeout in (None, 0.0, 5.0):
514 s = socket.socket(socket.AF_INET)
515 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200516 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100517 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000518
Christian Heimesd0486372016-09-10 23:23:33 +0200519 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000520 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000521 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000522 "certfile must be specified",
523 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000524 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000525 "certfile must be specified for server-side operations",
526 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000527 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000528 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200529 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100530 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
531 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200532 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200533 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000534 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000535 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000536 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200537 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000538 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000539 ssl.wrap_socket(sock,
540 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000541 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200542 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000543 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000544 ssl.wrap_socket(sock,
545 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000546 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000547
Martin Panter3464ea22016-02-01 21:58:11 +0000548 def bad_cert_test(self, certfile):
549 """Check that trying to use the given client certificate fails"""
550 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
551 certfile)
552 sock = socket.socket()
553 self.addCleanup(sock.close)
554 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200555 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200556 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000557
558 def test_empty_cert(self):
559 """Wrapping with an empty cert file"""
560 self.bad_cert_test("nullcert.pem")
561
562 def test_malformed_cert(self):
563 """Wrapping with a badly formatted certificate (syntax error)"""
564 self.bad_cert_test("badcert.pem")
565
566 def test_malformed_key(self):
567 """Wrapping with a badly formatted key (syntax error)"""
568 self.bad_cert_test("badkey.pem")
569
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000570 def test_match_hostname(self):
571 def ok(cert, hostname):
572 ssl.match_hostname(cert, hostname)
573 def fail(cert, hostname):
574 self.assertRaises(ssl.CertificateError,
575 ssl.match_hostname, cert, hostname)
576
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100577 # -- Hostname matching --
578
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000579 cert = {'subject': ((('commonName', 'example.com'),),)}
580 ok(cert, 'example.com')
581 ok(cert, 'ExAmple.cOm')
582 fail(cert, 'www.example.com')
583 fail(cert, '.example.com')
584 fail(cert, 'example.org')
585 fail(cert, 'exampleXcom')
586
587 cert = {'subject': ((('commonName', '*.a.com'),),)}
588 ok(cert, 'foo.a.com')
589 fail(cert, 'bar.foo.a.com')
590 fail(cert, 'a.com')
591 fail(cert, 'Xa.com')
592 fail(cert, '.a.com')
593
Mandeep Singhede2ac92017-11-27 04:01:27 +0530594 # only match wildcards when they are the only thing
595 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000596 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530597 fail(cert, 'foo.com')
598 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 fail(cert, 'bar.com')
600 fail(cert, 'foo.a.com')
601 fail(cert, 'bar.foo.com')
602
Christian Heimes824f7f32013-08-17 00:54:47 +0200603 # NULL bytes are bad, CVE-2013-4073
604 cert = {'subject': ((('commonName',
605 'null.python.org\x00example.org'),),)}
606 ok(cert, 'null.python.org\x00example.org') # or raise an error?
607 fail(cert, 'example.org')
608 fail(cert, 'null.python.org')
609
Georg Brandl72c98d32013-10-27 07:16:53 +0100610 # error cases with wildcards
611 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
612 fail(cert, 'bar.foo.a.com')
613 fail(cert, 'a.com')
614 fail(cert, 'Xa.com')
615 fail(cert, '.a.com')
616
617 cert = {'subject': ((('commonName', 'a.*.com'),),)}
618 fail(cert, 'a.foo.com')
619 fail(cert, 'a..com')
620 fail(cert, 'a.com')
621
622 # wildcard doesn't match IDNA prefix 'xn--'
623 idna = 'püthon.python.org'.encode("idna").decode("ascii")
624 cert = {'subject': ((('commonName', idna),),)}
625 ok(cert, idna)
626 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
627 fail(cert, idna)
628 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
629 fail(cert, idna)
630
631 # wildcard in first fragment and IDNA A-labels in sequent fragments
632 # are supported.
633 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
634 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530635 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
636 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100637 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
638 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
639
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000640 # Slightly fake real-world example
641 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
642 'subject': ((('commonName', 'linuxfrz.org'),),),
643 'subjectAltName': (('DNS', 'linuxfr.org'),
644 ('DNS', 'linuxfr.com'),
645 ('othername', '<unsupported>'))}
646 ok(cert, 'linuxfr.org')
647 ok(cert, 'linuxfr.com')
648 # Not a "DNS" entry
649 fail(cert, '<unsupported>')
650 # When there is a subjectAltName, commonName isn't used
651 fail(cert, 'linuxfrz.org')
652
653 # A pristine real-world example
654 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
655 'subject': ((('countryName', 'US'),),
656 (('stateOrProvinceName', 'California'),),
657 (('localityName', 'Mountain View'),),
658 (('organizationName', 'Google Inc'),),
659 (('commonName', 'mail.google.com'),))}
660 ok(cert, 'mail.google.com')
661 fail(cert, 'gmail.com')
662 # Only commonName is considered
663 fail(cert, 'California')
664
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100665 # -- IPv4 matching --
666 cert = {'subject': ((('commonName', 'example.com'),),),
667 'subjectAltName': (('DNS', 'example.com'),
668 ('IP Address', '10.11.12.13'),
669 ('IP Address', '14.15.16.17'))}
670 ok(cert, '10.11.12.13')
671 ok(cert, '14.15.16.17')
672 fail(cert, '14.15.16.18')
673 fail(cert, 'example.net')
674
675 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100676 if hasattr(socket, 'AF_INET6'):
677 cert = {'subject': ((('commonName', 'example.com'),),),
678 'subjectAltName': (
679 ('DNS', 'example.com'),
680 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
681 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
682 ok(cert, '2001::cafe')
683 ok(cert, '2003::baba')
684 fail(cert, '2003::bebe')
685 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100686
687 # -- Miscellaneous --
688
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000689 # Neither commonName nor subjectAltName
690 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
691 'subject': ((('countryName', 'US'),),
692 (('stateOrProvinceName', 'California'),),
693 (('localityName', 'Mountain View'),),
694 (('organizationName', 'Google Inc'),))}
695 fail(cert, 'mail.google.com')
696
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200697 # No DNS entry in subjectAltName but a commonName
698 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
699 'subject': ((('countryName', 'US'),),
700 (('stateOrProvinceName', 'California'),),
701 (('localityName', 'Mountain View'),),
702 (('commonName', 'mail.google.com'),)),
703 'subjectAltName': (('othername', 'blabla'), )}
704 ok(cert, 'mail.google.com')
705
706 # No DNS entry subjectAltName and no commonName
707 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
708 'subject': ((('countryName', 'US'),),
709 (('stateOrProvinceName', 'California'),),
710 (('localityName', 'Mountain View'),),
711 (('organizationName', 'Google Inc'),)),
712 'subjectAltName': (('othername', 'blabla'),)}
713 fail(cert, 'google.com')
714
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000715 # Empty cert / no cert
716 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
717 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
718
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200719 # Issue #17980: avoid denials of service by refusing more than one
720 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100721 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
722 with self.assertRaisesRegex(
723 ssl.CertificateError,
724 "partial wildcards in leftmost label are not supported"):
725 ssl.match_hostname(cert, 'axxb.example.com')
726
727 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
728 with self.assertRaisesRegex(
729 ssl.CertificateError,
730 "wildcard can only be present in the leftmost label"):
731 ssl.match_hostname(cert, 'www.sub.example.com')
732
733 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
734 with self.assertRaisesRegex(
735 ssl.CertificateError,
736 "too many wildcards"):
737 ssl.match_hostname(cert, 'axxbxxc.example.com')
738
739 cert = {'subject': ((('commonName', '*'),),)}
740 with self.assertRaisesRegex(
741 ssl.CertificateError,
742 "sole wildcard without additional labels are not support"):
743 ssl.match_hostname(cert, 'host')
744
745 cert = {'subject': ((('commonName', '*.com'),),)}
746 with self.assertRaisesRegex(
747 ssl.CertificateError,
748 r"hostname 'com' doesn't match '\*.com'"):
749 ssl.match_hostname(cert, 'com')
750
751 # extra checks for _inet_paton()
752 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
753 with self.assertRaises(ValueError):
754 ssl._inet_paton(invalid)
755 for ipaddr in ['127.0.0.1', '192.168.0.1']:
756 self.assertTrue(ssl._inet_paton(ipaddr))
757 if hasattr(socket, 'AF_INET6'):
758 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
759 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200760
Antoine Pitroud5323212010-10-22 18:19:07 +0000761 def test_server_side(self):
762 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200763 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000764 with socket.socket() as sock:
765 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
766 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000767
Antoine Pitroud6494802011-07-21 01:11:30 +0200768 def test_unknown_channel_binding(self):
769 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200770 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200771 c = socket.socket(socket.AF_INET)
772 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200773 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100774 with self.assertRaises(ValueError):
775 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200776 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200777
778 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
779 "'tls-unique' channel binding not available")
780 def test_tls_unique_channel_binding(self):
781 # unconnected should return None for known type
782 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200783 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100784 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200785 # the same for server-side
786 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200787 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100788 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200789
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600790 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200791 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600792 r = repr(ss)
793 with self.assertWarns(ResourceWarning) as cm:
794 ss = None
795 support.gc_collect()
796 self.assertIn(r, str(cm.warning.args[0]))
797
Christian Heimes6d7ad132013-06-09 18:02:55 +0200798 def test_get_default_verify_paths(self):
799 paths = ssl.get_default_verify_paths()
800 self.assertEqual(len(paths), 6)
801 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
802
803 with support.EnvironmentVarGuard() as env:
804 env["SSL_CERT_DIR"] = CAPATH
805 env["SSL_CERT_FILE"] = CERTFILE
806 paths = ssl.get_default_verify_paths()
807 self.assertEqual(paths.cafile, CERTFILE)
808 self.assertEqual(paths.capath, CAPATH)
809
Christian Heimes44109d72013-11-22 01:51:30 +0100810 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
811 def test_enum_certificates(self):
812 self.assertTrue(ssl.enum_certificates("CA"))
813 self.assertTrue(ssl.enum_certificates("ROOT"))
814
815 self.assertRaises(TypeError, ssl.enum_certificates)
816 self.assertRaises(WindowsError, ssl.enum_certificates, "")
817
Christian Heimesc2d65e12013-11-22 16:13:55 +0100818 trust_oids = set()
819 for storename in ("CA", "ROOT"):
820 store = ssl.enum_certificates(storename)
821 self.assertIsInstance(store, list)
822 for element in store:
823 self.assertIsInstance(element, tuple)
824 self.assertEqual(len(element), 3)
825 cert, enc, trust = element
826 self.assertIsInstance(cert, bytes)
827 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
828 self.assertIsInstance(trust, (set, bool))
829 if isinstance(trust, set):
830 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100831
832 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100833 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200834
Christian Heimes46bebee2013-06-09 19:03:31 +0200835 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100836 def test_enum_crls(self):
837 self.assertTrue(ssl.enum_crls("CA"))
838 self.assertRaises(TypeError, ssl.enum_crls)
839 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200840
Christian Heimes44109d72013-11-22 01:51:30 +0100841 crls = ssl.enum_crls("CA")
842 self.assertIsInstance(crls, list)
843 for element in crls:
844 self.assertIsInstance(element, tuple)
845 self.assertEqual(len(element), 2)
846 self.assertIsInstance(element[0], bytes)
847 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200848
Christian Heimes46bebee2013-06-09 19:03:31 +0200849
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100850 def test_asn1object(self):
851 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
852 '1.3.6.1.5.5.7.3.1')
853
854 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
855 self.assertEqual(val, expected)
856 self.assertEqual(val.nid, 129)
857 self.assertEqual(val.shortname, 'serverAuth')
858 self.assertEqual(val.longname, 'TLS Web Server Authentication')
859 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
860 self.assertIsInstance(val, ssl._ASN1Object)
861 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
862
863 val = ssl._ASN1Object.fromnid(129)
864 self.assertEqual(val, expected)
865 self.assertIsInstance(val, ssl._ASN1Object)
866 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100867 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
868 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100869 for i in range(1000):
870 try:
871 obj = ssl._ASN1Object.fromnid(i)
872 except ValueError:
873 pass
874 else:
875 self.assertIsInstance(obj.nid, int)
876 self.assertIsInstance(obj.shortname, str)
877 self.assertIsInstance(obj.longname, str)
878 self.assertIsInstance(obj.oid, (str, type(None)))
879
880 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
881 self.assertEqual(val, expected)
882 self.assertIsInstance(val, ssl._ASN1Object)
883 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
884 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
885 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100886 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
887 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100888
Christian Heimes72d28502013-11-23 13:56:58 +0100889 def test_purpose_enum(self):
890 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
891 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
892 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
893 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
894 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
895 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
896 '1.3.6.1.5.5.7.3.1')
897
898 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
899 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
900 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
901 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
902 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
903 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
904 '1.3.6.1.5.5.7.3.2')
905
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100906 def test_unsupported_dtls(self):
907 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
908 self.addCleanup(s.close)
909 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200910 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100911 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200912 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100913 with self.assertRaises(NotImplementedError) as cx:
914 ctx.wrap_socket(s)
915 self.assertEqual(str(cx.exception), "only stream sockets are supported")
916
Antoine Pitrouc695c952014-04-28 20:57:36 +0200917 def cert_time_ok(self, timestring, timestamp):
918 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
919
920 def cert_time_fail(self, timestring):
921 with self.assertRaises(ValueError):
922 ssl.cert_time_to_seconds(timestring)
923
924 @unittest.skipUnless(utc_offset(),
925 'local time needs to be different from UTC')
926 def test_cert_time_to_seconds_timezone(self):
927 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
928 # results if local timezone is not UTC
929 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
930 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
931
932 def test_cert_time_to_seconds(self):
933 timestring = "Jan 5 09:34:43 2018 GMT"
934 ts = 1515144883.0
935 self.cert_time_ok(timestring, ts)
936 # accept keyword parameter, assert its name
937 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
938 # accept both %e and %d (space or zero generated by strftime)
939 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
940 # case-insensitive
941 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
942 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
943 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
944 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
945 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
946 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
947 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
948 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
949
950 newyear_ts = 1230768000.0
951 # leap seconds
952 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
953 # same timestamp
954 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
955
956 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
957 # allow 60th second (even if it is not a leap second)
958 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
959 # allow 2nd leap second for compatibility with time.strptime()
960 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
961 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
962
Mike53f7a7c2017-12-14 14:04:53 +0300963 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200964 # 99991231235959Z (rfc 5280)
965 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
966
967 @support.run_with_locale('LC_ALL', '')
968 def test_cert_time_to_seconds_locale(self):
969 # `cert_time_to_seconds()` should be locale independent
970
971 def local_february_name():
972 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
973
974 if local_february_name().lower() == 'feb':
975 self.skipTest("locale-specific month name needs to be "
976 "different from C locale")
977
978 # locale-independent
979 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
980 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
981
Martin Panter3840b2a2016-03-27 01:53:46 +0000982 def test_connect_ex_error(self):
983 server = socket.socket(socket.AF_INET)
984 self.addCleanup(server.close)
985 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200986 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000987 cert_reqs=ssl.CERT_REQUIRED)
988 self.addCleanup(s.close)
989 rc = s.connect_ex((HOST, port))
990 # Issue #19919: Windows machines or VMs hosted on Windows
991 # machines sometimes return EWOULDBLOCK.
992 errors = (
993 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
994 errno.EWOULDBLOCK,
995 )
996 self.assertIn(rc, errors)
997
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100998
Antoine Pitrou152efa22010-05-16 18:19:27 +0000999class ContextTests(unittest.TestCase):
1000
1001 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001002 for protocol in PROTOCOLS:
1003 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001004 ctx = ssl.SSLContext()
1005 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001006 self.assertRaises(ValueError, ssl.SSLContext, -1)
1007 self.assertRaises(ValueError, ssl.SSLContext, 42)
1008
1009 def test_protocol(self):
1010 for proto in PROTOCOLS:
1011 ctx = ssl.SSLContext(proto)
1012 self.assertEqual(ctx.protocol, proto)
1013
1014 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001016 ctx.set_ciphers("ALL")
1017 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001018 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001019 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001020
Christian Heimes892d66e2018-01-29 14:10:18 +01001021 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1022 "Test applies only to Python default ciphers")
1023 def test_python_ciphers(self):
1024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1025 ciphers = ctx.get_ciphers()
1026 for suite in ciphers:
1027 name = suite['name']
1028 self.assertNotIn("PSK", name)
1029 self.assertNotIn("SRP", name)
1030 self.assertNotIn("MD5", name)
1031 self.assertNotIn("RC4", name)
1032 self.assertNotIn("3DES", name)
1033
Christian Heimes25bfcd52016-09-06 00:04:45 +02001034 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1035 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001036 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001037 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001038 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001039 self.assertIn('AES256-GCM-SHA384', names)
1040 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001041
Antoine Pitroub5218772010-05-21 09:56:06 +00001042 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001044 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001045 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001046 # SSLContext also enables these by default
1047 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001048 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1049 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001050 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001051 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001052 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001053 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001054 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1055 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001056 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001057 # Ubuntu has OP_NO_SSLv3 forced on by default
1058 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001059 else:
1060 with self.assertRaises(ValueError):
1061 ctx.options = 0
1062
Christian Heimesa170fa12017-09-15 20:27:30 +02001063 def test_verify_mode_protocol(self):
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001065 # Default value
1066 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1067 ctx.verify_mode = ssl.CERT_OPTIONAL
1068 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1069 ctx.verify_mode = ssl.CERT_REQUIRED
1070 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1071 ctx.verify_mode = ssl.CERT_NONE
1072 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1073 with self.assertRaises(TypeError):
1074 ctx.verify_mode = None
1075 with self.assertRaises(ValueError):
1076 ctx.verify_mode = 42
1077
Christian Heimesa170fa12017-09-15 20:27:30 +02001078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1079 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1080 self.assertFalse(ctx.check_hostname)
1081
1082 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1083 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1084 self.assertTrue(ctx.check_hostname)
1085
Christian Heimes61d478c2018-01-27 15:51:38 +01001086 def test_hostname_checks_common_name(self):
1087 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1088 self.assertTrue(ctx.hostname_checks_common_name)
1089 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1090 ctx.hostname_checks_common_name = True
1091 self.assertTrue(ctx.hostname_checks_common_name)
1092 ctx.hostname_checks_common_name = False
1093 self.assertFalse(ctx.hostname_checks_common_name)
1094 ctx.hostname_checks_common_name = True
1095 self.assertTrue(ctx.hostname_checks_common_name)
1096 else:
1097 with self.assertRaises(AttributeError):
1098 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001099
Christian Heimes698dde12018-02-27 11:54:43 +01001100 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1101 "required OpenSSL 1.1.0g")
1102 def test_min_max_version(self):
1103 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001104 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1105 # Fedora override the setting to TLS 1.0.
1106 self.assertIn(
1107 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001108 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1109 # Fedora 29 uses TLS 1.0 by default
1110 ssl.TLSVersion.TLSv1,
1111 # RHEL 8 uses TLS 1.2 by default
1112 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001113 )
1114 self.assertEqual(
1115 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1116 )
1117
1118 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1119 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1120 self.assertEqual(
1121 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1122 )
1123 self.assertEqual(
1124 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1125 )
1126
1127 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1128 ctx.maximum_version = ssl.TLSVersion.TLSv1
1129 self.assertEqual(
1130 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1131 )
1132 self.assertEqual(
1133 ctx.maximum_version, ssl.TLSVersion.TLSv1
1134 )
1135
1136 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 self.assertEqual(
1138 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1139 )
1140
1141 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1142 self.assertIn(
1143 ctx.maximum_version,
1144 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1145 )
1146
1147 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1148 self.assertIn(
1149 ctx.minimum_version,
1150 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1151 )
1152
1153 with self.assertRaises(ValueError):
1154 ctx.minimum_version = 42
1155
1156 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1157
1158 self.assertEqual(
1159 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1160 )
1161 self.assertEqual(
1162 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1163 )
1164 with self.assertRaises(ValueError):
1165 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1166 with self.assertRaises(ValueError):
1167 ctx.maximum_version = ssl.TLSVersion.TLSv1
1168
1169
Christian Heimes2427b502013-11-23 11:24:32 +01001170 @unittest.skipUnless(have_verify_flags(),
1171 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001172 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001174 # default value
1175 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1176 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001177 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1178 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1179 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1180 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1181 ctx.verify_flags = ssl.VERIFY_DEFAULT
1182 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1183 # supports any value
1184 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1185 self.assertEqual(ctx.verify_flags,
1186 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1187 with self.assertRaises(TypeError):
1188 ctx.verify_flags = None
1189
Antoine Pitrou152efa22010-05-16 18:19:27 +00001190 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001192 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001193 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001194 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1195 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001196 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001197 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001198 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001199 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001200 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001201 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001202 ctx.load_cert_chain(EMPTYCERT)
1203 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001204 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001205 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1206 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1207 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001208 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001209 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001210 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001211 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001212 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001213 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1214 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001215 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001216 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001217 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001218 # Password protected key and cert
1219 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1220 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1221 ctx.load_cert_chain(CERTFILE_PROTECTED,
1222 password=bytearray(KEY_PASSWORD.encode()))
1223 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1224 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1225 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1226 bytearray(KEY_PASSWORD.encode()))
1227 with self.assertRaisesRegex(TypeError, "should be a string"):
1228 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1229 with self.assertRaises(ssl.SSLError):
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1231 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1232 # openssl has a fixed limit on the password buffer.
1233 # PEM_BUFSIZE is generally set to 1kb.
1234 # Return a string larger than this.
1235 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1236 # Password callback
1237 def getpass_unicode():
1238 return KEY_PASSWORD
1239 def getpass_bytes():
1240 return KEY_PASSWORD.encode()
1241 def getpass_bytearray():
1242 return bytearray(KEY_PASSWORD.encode())
1243 def getpass_badpass():
1244 return "badpass"
1245 def getpass_huge():
1246 return b'a' * (1024 * 1024)
1247 def getpass_bad_type():
1248 return 9
1249 def getpass_exception():
1250 raise Exception('getpass error')
1251 class GetPassCallable:
1252 def __call__(self):
1253 return KEY_PASSWORD
1254 def getpass(self):
1255 return KEY_PASSWORD
1256 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1257 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1258 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1259 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1260 ctx.load_cert_chain(CERTFILE_PROTECTED,
1261 password=GetPassCallable().getpass)
1262 with self.assertRaises(ssl.SSLError):
1263 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1264 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1265 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1266 with self.assertRaisesRegex(TypeError, "must return a string"):
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1268 with self.assertRaisesRegex(Exception, "getpass error"):
1269 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1270 # Make sure the password function isn't called if it isn't needed
1271 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001272
1273 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001275 ctx.load_verify_locations(CERTFILE)
1276 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1277 ctx.load_verify_locations(BYTES_CERTFILE)
1278 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1279 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001280 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001281 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001282 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001283 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001284 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001285 ctx.load_verify_locations(BADCERT)
1286 ctx.load_verify_locations(CERTFILE, CAPATH)
1287 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1288
Victor Stinner80f75e62011-01-29 11:31:20 +00001289 # Issue #10989: crash if the second argument type is invalid
1290 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1291
Christian Heimesefff7062013-11-21 03:35:02 +01001292 def test_load_verify_cadata(self):
1293 # test cadata
1294 with open(CAFILE_CACERT) as f:
1295 cacert_pem = f.read()
1296 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1297 with open(CAFILE_NEURONIO) as f:
1298 neuronio_pem = f.read()
1299 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1300
1301 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001302 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1304 ctx.load_verify_locations(cadata=cacert_pem)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1306 ctx.load_verify_locations(cadata=neuronio_pem)
1307 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1308 # cert already in hash table
1309 ctx.load_verify_locations(cadata=neuronio_pem)
1310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1311
1312 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001313 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001314 combined = "\n".join((cacert_pem, neuronio_pem))
1315 ctx.load_verify_locations(cadata=combined)
1316 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1317
1318 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001319 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001320 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1321 neuronio_pem, "tail"]
1322 ctx.load_verify_locations(cadata="\n".join(combined))
1323 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1324
1325 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001326 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001327 ctx.load_verify_locations(cadata=cacert_der)
1328 ctx.load_verify_locations(cadata=neuronio_der)
1329 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1330 # cert already in hash table
1331 ctx.load_verify_locations(cadata=cacert_der)
1332 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1333
1334 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001335 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001336 combined = b"".join((cacert_der, neuronio_der))
1337 ctx.load_verify_locations(cadata=combined)
1338 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1339
1340 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001341 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001342 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1343
1344 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1345 ctx.load_verify_locations(cadata="broken")
1346 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1347 ctx.load_verify_locations(cadata=b"broken")
1348
1349
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001350 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001351 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001352 ctx.load_dh_params(DHFILE)
1353 if os.name != 'nt':
1354 ctx.load_dh_params(BYTES_DHFILE)
1355 self.assertRaises(TypeError, ctx.load_dh_params)
1356 self.assertRaises(TypeError, ctx.load_dh_params, None)
1357 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001358 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001359 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001360 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001361 ctx.load_dh_params(CERTFILE)
1362
Antoine Pitroub0182c82010-10-12 20:09:02 +00001363 def test_session_stats(self):
1364 for proto in PROTOCOLS:
1365 ctx = ssl.SSLContext(proto)
1366 self.assertEqual(ctx.session_stats(), {
1367 'number': 0,
1368 'connect': 0,
1369 'connect_good': 0,
1370 'connect_renegotiate': 0,
1371 'accept': 0,
1372 'accept_good': 0,
1373 'accept_renegotiate': 0,
1374 'hits': 0,
1375 'misses': 0,
1376 'timeouts': 0,
1377 'cache_full': 0,
1378 })
1379
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001380 def test_set_default_verify_paths(self):
1381 # There's not much we can do to test that it acts as expected,
1382 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001383 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001384 ctx.set_default_verify_paths()
1385
Antoine Pitrou501da612011-12-21 09:27:41 +01001386 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001387 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001388 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001389 ctx.set_ecdh_curve("prime256v1")
1390 ctx.set_ecdh_curve(b"prime256v1")
1391 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1392 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1393 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1394 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1395
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001396 @needs_sni
1397 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001398 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001399
1400 # set_servername_callback expects a callable, or None
1401 self.assertRaises(TypeError, ctx.set_servername_callback)
1402 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1403 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1404 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1405
1406 def dummycallback(sock, servername, ctx):
1407 pass
1408 ctx.set_servername_callback(None)
1409 ctx.set_servername_callback(dummycallback)
1410
1411 @needs_sni
1412 def test_sni_callback_refcycle(self):
1413 # Reference cycles through the servername callback are detected
1414 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001415 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001416 def dummycallback(sock, servername, ctx, cycle=ctx):
1417 pass
1418 ctx.set_servername_callback(dummycallback)
1419 wr = weakref.ref(ctx)
1420 del ctx, dummycallback
1421 gc.collect()
1422 self.assertIs(wr(), None)
1423
Christian Heimes9a5395a2013-06-17 15:44:12 +02001424 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001426 self.assertEqual(ctx.cert_store_stats(),
1427 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1428 ctx.load_cert_chain(CERTFILE)
1429 self.assertEqual(ctx.cert_store_stats(),
1430 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1431 ctx.load_verify_locations(CERTFILE)
1432 self.assertEqual(ctx.cert_store_stats(),
1433 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001434 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001435 self.assertEqual(ctx.cert_store_stats(),
1436 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1437
1438 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001439 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001440 self.assertEqual(ctx.get_ca_certs(), [])
1441 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1442 ctx.load_verify_locations(CERTFILE)
1443 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001444 # but CAFILE_CACERT is a CA cert
1445 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001446 self.assertEqual(ctx.get_ca_certs(),
1447 [{'issuer': ((('organizationName', 'Root CA'),),
1448 (('organizationalUnitName', 'http://www.cacert.org'),),
1449 (('commonName', 'CA Cert Signing Authority'),),
1450 (('emailAddress', 'support@cacert.org'),)),
1451 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1452 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1453 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001454 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001455 'subject': ((('organizationName', 'Root CA'),),
1456 (('organizationalUnitName', 'http://www.cacert.org'),),
1457 (('commonName', 'CA Cert Signing Authority'),),
1458 (('emailAddress', 'support@cacert.org'),)),
1459 'version': 3}])
1460
Martin Panterb55f8b72016-01-14 12:53:56 +00001461 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001462 pem = f.read()
1463 der = ssl.PEM_cert_to_DER_cert(pem)
1464 self.assertEqual(ctx.get_ca_certs(True), [der])
1465
Christian Heimes72d28502013-11-23 13:56:58 +01001466 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001467 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001468 ctx.load_default_certs()
1469
Christian Heimesa170fa12017-09-15 20:27:30 +02001470 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001471 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1472 ctx.load_default_certs()
1473
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001475 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1476
Christian Heimesa170fa12017-09-15 20:27:30 +02001477 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001478 self.assertRaises(TypeError, ctx.load_default_certs, None)
1479 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1480
Benjamin Peterson91244e02014-10-03 18:17:15 -04001481 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001482 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001483 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001484 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001485 with support.EnvironmentVarGuard() as env:
1486 env["SSL_CERT_DIR"] = CAPATH
1487 env["SSL_CERT_FILE"] = CERTFILE
1488 ctx.load_default_certs()
1489 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1490
Benjamin Peterson91244e02014-10-03 18:17:15 -04001491 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001492 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001493 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001494 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001495 ctx.load_default_certs()
1496 stats = ctx.cert_store_stats()
1497
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001499 with support.EnvironmentVarGuard() as env:
1500 env["SSL_CERT_DIR"] = CAPATH
1501 env["SSL_CERT_FILE"] = CERTFILE
1502 ctx.load_default_certs()
1503 stats["x509"] += 1
1504 self.assertEqual(ctx.cert_store_stats(), stats)
1505
Christian Heimes358cfd42016-09-10 22:43:48 +02001506 def _assert_context_options(self, ctx):
1507 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1508 if OP_NO_COMPRESSION != 0:
1509 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1510 OP_NO_COMPRESSION)
1511 if OP_SINGLE_DH_USE != 0:
1512 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1513 OP_SINGLE_DH_USE)
1514 if OP_SINGLE_ECDH_USE != 0:
1515 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1516 OP_SINGLE_ECDH_USE)
1517 if OP_CIPHER_SERVER_PREFERENCE != 0:
1518 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1519 OP_CIPHER_SERVER_PREFERENCE)
1520
Christian Heimes4c05b472013-11-23 15:58:30 +01001521 def test_create_default_context(self):
1522 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001523
Christian Heimesa170fa12017-09-15 20:27:30 +02001524 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001525 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001526 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001527 self._assert_context_options(ctx)
1528
Christian Heimes4c05b472013-11-23 15:58:30 +01001529 with open(SIGNING_CA) as f:
1530 cadata = f.read()
1531 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1532 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001533 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001534 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001535 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001536
1537 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001539 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001540 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001541
Christian Heimes67986f92013-11-23 22:43:47 +01001542 def test__create_stdlib_context(self):
1543 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001544 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001545 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001546 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001547 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001548
1549 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1550 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1551 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001552 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001553
1554 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001555 cert_reqs=ssl.CERT_REQUIRED,
1556 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001557 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1558 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001559 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001560 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001561
1562 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001563 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001564 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001565 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001566
Christian Heimes1aa9a752013-12-02 02:41:19 +01001567 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001568 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001569 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001570 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001571
Christian Heimese82c0342017-09-15 20:29:57 +02001572 # Auto set CERT_REQUIRED
1573 ctx.check_hostname = True
1574 self.assertTrue(ctx.check_hostname)
1575 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1576 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001577 ctx.verify_mode = ssl.CERT_REQUIRED
1578 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001579 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001580
Christian Heimese82c0342017-09-15 20:29:57 +02001581 # Changing verify_mode does not affect check_hostname
1582 ctx.check_hostname = False
1583 ctx.verify_mode = ssl.CERT_NONE
1584 ctx.check_hostname = False
1585 self.assertFalse(ctx.check_hostname)
1586 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1587 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001588 ctx.check_hostname = True
1589 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001590 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1591
1592 ctx.check_hostname = False
1593 ctx.verify_mode = ssl.CERT_OPTIONAL
1594 ctx.check_hostname = False
1595 self.assertFalse(ctx.check_hostname)
1596 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1597 # keep CERT_OPTIONAL
1598 ctx.check_hostname = True
1599 self.assertTrue(ctx.check_hostname)
1600 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001601
1602 # Cannot set CERT_NONE with check_hostname enabled
1603 with self.assertRaises(ValueError):
1604 ctx.verify_mode = ssl.CERT_NONE
1605 ctx.check_hostname = False
1606 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001607 ctx.verify_mode = ssl.CERT_NONE
1608 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001609
Christian Heimes5fe668c2016-09-12 00:01:11 +02001610 def test_context_client_server(self):
1611 # PROTOCOL_TLS_CLIENT has sane defaults
1612 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1613 self.assertTrue(ctx.check_hostname)
1614 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1615
1616 # PROTOCOL_TLS_SERVER has different but also sane defaults
1617 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1618 self.assertFalse(ctx.check_hostname)
1619 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1620
Christian Heimes4df60f12017-09-15 20:26:05 +02001621 def test_context_custom_class(self):
1622 class MySSLSocket(ssl.SSLSocket):
1623 pass
1624
1625 class MySSLObject(ssl.SSLObject):
1626 pass
1627
1628 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1629 ctx.sslsocket_class = MySSLSocket
1630 ctx.sslobject_class = MySSLObject
1631
1632 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1633 self.assertIsInstance(sock, MySSLSocket)
1634 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1635 self.assertIsInstance(obj, MySSLObject)
1636
Christian Heimes78c7d522019-06-03 21:00:10 +02001637 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1638 def test_num_tickest(self):
1639 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1640 self.assertEqual(ctx.num_tickets, 2)
1641 ctx.num_tickets = 1
1642 self.assertEqual(ctx.num_tickets, 1)
1643 ctx.num_tickets = 0
1644 self.assertEqual(ctx.num_tickets, 0)
1645 with self.assertRaises(ValueError):
1646 ctx.num_tickets = -1
1647 with self.assertRaises(TypeError):
1648 ctx.num_tickets = None
1649
1650 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1651 self.assertEqual(ctx.num_tickets, 2)
1652 with self.assertRaises(ValueError):
1653 ctx.num_tickets = 1
1654
Antoine Pitrou152efa22010-05-16 18:19:27 +00001655
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001656class SSLErrorTests(unittest.TestCase):
1657
1658 def test_str(self):
1659 # The str() of a SSLError doesn't include the errno
1660 e = ssl.SSLError(1, "foo")
1661 self.assertEqual(str(e), "foo")
1662 self.assertEqual(e.errno, 1)
1663 # Same for a subclass
1664 e = ssl.SSLZeroReturnError(1, "foo")
1665 self.assertEqual(str(e), "foo")
1666 self.assertEqual(e.errno, 1)
1667
1668 def test_lib_reason(self):
1669 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001670 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001671 with self.assertRaises(ssl.SSLError) as cm:
1672 ctx.load_dh_params(CERTFILE)
1673 self.assertEqual(cm.exception.library, 'PEM')
1674 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1675 s = str(cm.exception)
1676 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1677
1678 def test_subclass(self):
1679 # Check that the appropriate SSLError subclass is raised
1680 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001681 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1682 ctx.check_hostname = False
1683 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001684 with socket.create_server(("127.0.0.1", 0)) as s:
1685 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001686 c.setblocking(False)
1687 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001688 with self.assertRaises(ssl.SSLWantReadError) as cm:
1689 c.do_handshake()
1690 s = str(cm.exception)
1691 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1692 # For compatibility
1693 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1694
1695
Christian Heimes61d478c2018-01-27 15:51:38 +01001696 def test_bad_server_hostname(self):
1697 ctx = ssl.create_default_context()
1698 with self.assertRaises(ValueError):
1699 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1700 server_hostname="")
1701 with self.assertRaises(ValueError):
1702 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1703 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001704 with self.assertRaises(TypeError):
1705 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1706 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001707
1708
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001709class MemoryBIOTests(unittest.TestCase):
1710
1711 def test_read_write(self):
1712 bio = ssl.MemoryBIO()
1713 bio.write(b'foo')
1714 self.assertEqual(bio.read(), b'foo')
1715 self.assertEqual(bio.read(), b'')
1716 bio.write(b'foo')
1717 bio.write(b'bar')
1718 self.assertEqual(bio.read(), b'foobar')
1719 self.assertEqual(bio.read(), b'')
1720 bio.write(b'baz')
1721 self.assertEqual(bio.read(2), b'ba')
1722 self.assertEqual(bio.read(1), b'z')
1723 self.assertEqual(bio.read(1), b'')
1724
1725 def test_eof(self):
1726 bio = ssl.MemoryBIO()
1727 self.assertFalse(bio.eof)
1728 self.assertEqual(bio.read(), b'')
1729 self.assertFalse(bio.eof)
1730 bio.write(b'foo')
1731 self.assertFalse(bio.eof)
1732 bio.write_eof()
1733 self.assertFalse(bio.eof)
1734 self.assertEqual(bio.read(2), b'fo')
1735 self.assertFalse(bio.eof)
1736 self.assertEqual(bio.read(1), b'o')
1737 self.assertTrue(bio.eof)
1738 self.assertEqual(bio.read(), b'')
1739 self.assertTrue(bio.eof)
1740
1741 def test_pending(self):
1742 bio = ssl.MemoryBIO()
1743 self.assertEqual(bio.pending, 0)
1744 bio.write(b'foo')
1745 self.assertEqual(bio.pending, 3)
1746 for i in range(3):
1747 bio.read(1)
1748 self.assertEqual(bio.pending, 3-i-1)
1749 for i in range(3):
1750 bio.write(b'x')
1751 self.assertEqual(bio.pending, i+1)
1752 bio.read()
1753 self.assertEqual(bio.pending, 0)
1754
1755 def test_buffer_types(self):
1756 bio = ssl.MemoryBIO()
1757 bio.write(b'foo')
1758 self.assertEqual(bio.read(), b'foo')
1759 bio.write(bytearray(b'bar'))
1760 self.assertEqual(bio.read(), b'bar')
1761 bio.write(memoryview(b'baz'))
1762 self.assertEqual(bio.read(), b'baz')
1763
1764 def test_error_types(self):
1765 bio = ssl.MemoryBIO()
1766 self.assertRaises(TypeError, bio.write, 'foo')
1767 self.assertRaises(TypeError, bio.write, None)
1768 self.assertRaises(TypeError, bio.write, True)
1769 self.assertRaises(TypeError, bio.write, 1)
1770
1771
Christian Heimes9d50ab52018-02-27 10:17:30 +01001772class SSLObjectTests(unittest.TestCase):
1773 def test_private_init(self):
1774 bio = ssl.MemoryBIO()
1775 with self.assertRaisesRegex(TypeError, "public constructor"):
1776 ssl.SSLObject(bio, bio)
1777
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001778 def test_unwrap(self):
1779 client_ctx, server_ctx, hostname = testing_context()
1780 c_in = ssl.MemoryBIO()
1781 c_out = ssl.MemoryBIO()
1782 s_in = ssl.MemoryBIO()
1783 s_out = ssl.MemoryBIO()
1784 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1785 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1786
1787 # Loop on the handshake for a bit to get it settled
1788 for _ in range(5):
1789 try:
1790 client.do_handshake()
1791 except ssl.SSLWantReadError:
1792 pass
1793 if c_out.pending:
1794 s_in.write(c_out.read())
1795 try:
1796 server.do_handshake()
1797 except ssl.SSLWantReadError:
1798 pass
1799 if s_out.pending:
1800 c_in.write(s_out.read())
1801 # Now the handshakes should be complete (don't raise WantReadError)
1802 client.do_handshake()
1803 server.do_handshake()
1804
1805 # Now if we unwrap one side unilaterally, it should send close-notify
1806 # and raise WantReadError:
1807 with self.assertRaises(ssl.SSLWantReadError):
1808 client.unwrap()
1809
1810 # But server.unwrap() does not raise, because it reads the client's
1811 # close-notify:
1812 s_in.write(c_out.read())
1813 server.unwrap()
1814
1815 # And now that the client gets the server's close-notify, it doesn't
1816 # raise either.
1817 c_in.write(s_out.read())
1818 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001819
Martin Panter3840b2a2016-03-27 01:53:46 +00001820class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001821 """Tests that connect to a simple server running in the background"""
1822
1823 def setUp(self):
1824 server = ThreadedEchoServer(SIGNED_CERTFILE)
1825 self.server_addr = (HOST, server.port)
1826 server.__enter__()
1827 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001828
Antoine Pitrou480a1242010-04-28 21:37:09 +00001829 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001830 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001831 cert_reqs=ssl.CERT_NONE) as s:
1832 s.connect(self.server_addr)
1833 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001834 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001835
Martin Panter3840b2a2016-03-27 01:53:46 +00001836 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001837 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001838 cert_reqs=ssl.CERT_REQUIRED,
1839 ca_certs=SIGNING_CA) as s:
1840 s.connect(self.server_addr)
1841 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001842 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001843
Martin Panter3840b2a2016-03-27 01:53:46 +00001844 def test_connect_fail(self):
1845 # This should fail because we have no verification certs. Connection
1846 # failure crashes ThreadedEchoServer, so run this in an independent
1847 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001848 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001849 cert_reqs=ssl.CERT_REQUIRED)
1850 self.addCleanup(s.close)
1851 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1852 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001853
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001854 def test_connect_ex(self):
1855 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001856 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001857 cert_reqs=ssl.CERT_REQUIRED,
1858 ca_certs=SIGNING_CA)
1859 self.addCleanup(s.close)
1860 self.assertEqual(0, s.connect_ex(self.server_addr))
1861 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001862
1863 def test_non_blocking_connect_ex(self):
1864 # Issue #11326: non-blocking connect_ex() should allow handshake
1865 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001866 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001867 cert_reqs=ssl.CERT_REQUIRED,
1868 ca_certs=SIGNING_CA,
1869 do_handshake_on_connect=False)
1870 self.addCleanup(s.close)
1871 s.setblocking(False)
1872 rc = s.connect_ex(self.server_addr)
1873 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1874 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1875 # Wait for connect to finish
1876 select.select([], [s], [], 5.0)
1877 # Non-blocking handshake
1878 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001879 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001880 s.do_handshake()
1881 break
1882 except ssl.SSLWantReadError:
1883 select.select([s], [], [], 5.0)
1884 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001885 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001886 # SSL established
1887 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001888
Antoine Pitrou152efa22010-05-16 18:19:27 +00001889 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001890 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001891 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001892 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1893 s.connect(self.server_addr)
1894 self.assertEqual({}, s.getpeercert())
1895 # Same with a server hostname
1896 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1897 server_hostname="dummy") as s:
1898 s.connect(self.server_addr)
1899 ctx.verify_mode = ssl.CERT_REQUIRED
1900 # This should succeed because we specify the root cert
1901 ctx.load_verify_locations(SIGNING_CA)
1902 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1903 s.connect(self.server_addr)
1904 cert = s.getpeercert()
1905 self.assertTrue(cert)
1906
1907 def test_connect_with_context_fail(self):
1908 # This should fail because we have no verification certs. Connection
1909 # failure crashes ThreadedEchoServer, so run this in an independent
1910 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001911 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001912 ctx.verify_mode = ssl.CERT_REQUIRED
1913 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1914 self.addCleanup(s.close)
1915 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1916 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001917
1918 def test_connect_capath(self):
1919 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001920 # NOTE: the subject hashing algorithm has been changed between
1921 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1922 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001923 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001924 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001925 ctx.verify_mode = ssl.CERT_REQUIRED
1926 ctx.load_verify_locations(capath=CAPATH)
1927 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1928 s.connect(self.server_addr)
1929 cert = s.getpeercert()
1930 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001931
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001933 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001934 ctx.verify_mode = ssl.CERT_REQUIRED
1935 ctx.load_verify_locations(capath=BYTES_CAPATH)
1936 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1937 s.connect(self.server_addr)
1938 cert = s.getpeercert()
1939 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001940
Christian Heimesefff7062013-11-21 03:35:02 +01001941 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001943 pem = f.read()
1944 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001945 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 ctx.verify_mode = ssl.CERT_REQUIRED
1947 ctx.load_verify_locations(cadata=pem)
1948 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1949 s.connect(self.server_addr)
1950 cert = s.getpeercert()
1951 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001952
Martin Panter3840b2a2016-03-27 01:53:46 +00001953 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001954 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001955 ctx.verify_mode = ssl.CERT_REQUIRED
1956 ctx.load_verify_locations(cadata=der)
1957 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1958 s.connect(self.server_addr)
1959 cert = s.getpeercert()
1960 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001961
Antoine Pitroue3220242010-04-24 11:13:53 +00001962 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1963 def test_makefile_close(self):
1964 # Issue #5238: creating a file-like object with makefile() shouldn't
1965 # delay closing the underlying "real socket" (here tested with its
1966 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001967 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001968 ss.connect(self.server_addr)
1969 fd = ss.fileno()
1970 f = ss.makefile()
1971 f.close()
1972 # The fd is still open
1973 os.read(fd, 0)
1974 # Closing the SSL socket should close the fd too
1975 ss.close()
1976 gc.collect()
1977 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001978 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001979 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001980
Antoine Pitrou480a1242010-04-28 21:37:09 +00001981 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001982 s = socket.socket(socket.AF_INET)
1983 s.connect(self.server_addr)
1984 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001985 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001986 cert_reqs=ssl.CERT_NONE,
1987 do_handshake_on_connect=False)
1988 self.addCleanup(s.close)
1989 count = 0
1990 while True:
1991 try:
1992 count += 1
1993 s.do_handshake()
1994 break
1995 except ssl.SSLWantReadError:
1996 select.select([s], [], [])
1997 except ssl.SSLWantWriteError:
1998 select.select([], [s], [])
1999 if support.verbose:
2000 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002001
Antoine Pitrou480a1242010-04-28 21:37:09 +00002002 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002003 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002004
Martin Panter3840b2a2016-03-27 01:53:46 +00002005 def test_get_server_certificate_fail(self):
2006 # Connection failure crashes ThreadedEchoServer, so run this in an
2007 # independent test method
2008 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002009
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002010 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002011 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002012 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2013 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002014 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002015 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2016 s.connect(self.server_addr)
2017 # Error checking can happen at instantiation or when connecting
2018 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2019 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002020 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002021 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2022 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002023
Christian Heimes9a5395a2013-06-17 15:44:12 +02002024 def test_get_ca_certs_capath(self):
2025 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002026 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002027 ctx.load_verify_locations(capath=CAPATH)
2028 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002029 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2030 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002031 s.connect(self.server_addr)
2032 cert = s.getpeercert()
2033 self.assertTrue(cert)
2034 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002035
Christian Heimes575596e2013-12-15 21:49:17 +01002036 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002037 def test_context_setget(self):
2038 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002039 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2040 ctx1.load_verify_locations(capath=CAPATH)
2041 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2042 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002043 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002044 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002045 ss.connect(self.server_addr)
2046 self.assertIs(ss.context, ctx1)
2047 self.assertIs(ss._sslobj.context, ctx1)
2048 ss.context = ctx2
2049 self.assertIs(ss.context, ctx2)
2050 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002051
2052 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2053 # A simple IO loop. Call func(*args) depending on the error we get
2054 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2055 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002056 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002057 count = 0
2058 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002059 if time.monotonic() > deadline:
2060 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002061 errno = None
2062 count += 1
2063 try:
2064 ret = func(*args)
2065 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002066 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002067 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002068 raise
2069 errno = e.errno
2070 # Get any data from the outgoing BIO irrespective of any error, and
2071 # send it to the socket.
2072 buf = outgoing.read()
2073 sock.sendall(buf)
2074 # If there's no error, we're done. For WANT_READ, we need to get
2075 # data from the socket and put it in the incoming BIO.
2076 if errno is None:
2077 break
2078 elif errno == ssl.SSL_ERROR_WANT_READ:
2079 buf = sock.recv(32768)
2080 if buf:
2081 incoming.write(buf)
2082 else:
2083 incoming.write_eof()
2084 if support.verbose:
2085 sys.stdout.write("Needed %d calls to complete %s().\n"
2086 % (count, func.__name__))
2087 return ret
2088
Martin Panter3840b2a2016-03-27 01:53:46 +00002089 def test_bio_handshake(self):
2090 sock = socket.socket(socket.AF_INET)
2091 self.addCleanup(sock.close)
2092 sock.connect(self.server_addr)
2093 incoming = ssl.MemoryBIO()
2094 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002095 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2096 self.assertTrue(ctx.check_hostname)
2097 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002098 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002099 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2100 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002101 self.assertIs(sslobj._sslobj.owner, sslobj)
2102 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002103 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002104 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002105 self.assertRaises(ValueError, sslobj.getpeercert)
2106 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2107 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2108 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2109 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002110 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002111 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002112 self.assertTrue(sslobj.getpeercert())
2113 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2114 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2115 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002116 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002117 except ssl.SSLSyscallError:
2118 # If the server shuts down the TCP connection without sending a
2119 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2120 pass
2121 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2122
2123 def test_bio_read_write_data(self):
2124 sock = socket.socket(socket.AF_INET)
2125 self.addCleanup(sock.close)
2126 sock.connect(self.server_addr)
2127 incoming = ssl.MemoryBIO()
2128 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002129 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002130 ctx.verify_mode = ssl.CERT_NONE
2131 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2132 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2133 req = b'FOO\n'
2134 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2135 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2136 self.assertEqual(buf, b'foo\n')
2137 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002138
2139
Martin Panter3840b2a2016-03-27 01:53:46 +00002140class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002141
Martin Panter3840b2a2016-03-27 01:53:46 +00002142 def test_timeout_connect_ex(self):
2143 # Issue #12065: on a timeout, connect_ex() should return the original
2144 # errno (mimicking the behaviour of non-SSL sockets).
2145 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002146 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002147 cert_reqs=ssl.CERT_REQUIRED,
2148 do_handshake_on_connect=False)
2149 self.addCleanup(s.close)
2150 s.settimeout(0.0000001)
2151 rc = s.connect_ex((REMOTE_HOST, 443))
2152 if rc == 0:
2153 self.skipTest("REMOTE_HOST responded too quickly")
2154 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2155
2156 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2157 def test_get_server_certificate_ipv6(self):
2158 with support.transient_internet('ipv6.google.com'):
2159 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2160 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2161
Martin Panter3840b2a2016-03-27 01:53:46 +00002162
2163def _test_get_server_certificate(test, host, port, cert=None):
2164 pem = ssl.get_server_certificate((host, port))
2165 if not pem:
2166 test.fail("No server certificate on %s:%s!" % (host, port))
2167
2168 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2169 if not pem:
2170 test.fail("No server certificate on %s:%s!" % (host, port))
2171 if support.verbose:
2172 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2173
2174def _test_get_server_certificate_fail(test, host, port):
2175 try:
2176 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2177 except ssl.SSLError as x:
2178 #should fail
2179 if support.verbose:
2180 sys.stdout.write("%s\n" % x)
2181 else:
2182 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2183
2184
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002185from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002186
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002187class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002188
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002189 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002190
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002191 """A mildly complicated class, because we want it to work both
2192 with and without the SSL wrapper around the socket connection, so
2193 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002194
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002195 def __init__(self, server, connsock, addr):
2196 self.server = server
2197 self.running = False
2198 self.sock = connsock
2199 self.addr = addr
2200 self.sock.setblocking(1)
2201 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002202 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002203 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002204
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002205 def wrap_conn(self):
2206 try:
2207 self.sslconn = self.server.context.wrap_socket(
2208 self.sock, server_side=True)
2209 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2210 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002211 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002212 # We treat ConnectionResetError as though it were an
2213 # SSLError - OpenSSL on Ubuntu abruptly closes the
2214 # connection when asked to use an unsupported protocol.
2215 #
Christian Heimes529525f2018-05-23 22:24:45 +02002216 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2217 # tries to send session tickets after handshake.
2218 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002219 #
2220 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2221 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002222 self.server.conn_errors.append(str(e))
2223 if self.server.chatty:
2224 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2225 self.running = False
2226 self.close()
2227 return False
2228 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002229 # OSError may occur with wrong protocols, e.g. both
2230 # sides use PROTOCOL_TLS_SERVER.
2231 #
2232 # XXX Various errors can have happened here, for example
2233 # a mismatching protocol version, an invalid certificate,
2234 # or a low-level bug. This should be made more discriminating.
2235 #
2236 # bpo-31323: Store the exception as string to prevent
2237 # a reference leak: server -> conn_errors -> exception
2238 # -> traceback -> self (ConnectionHandler) -> server
2239 self.server.conn_errors.append(str(e))
2240 if self.server.chatty:
2241 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2242 self.running = False
2243 self.server.stop()
2244 self.close()
2245 return False
2246 else:
2247 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2248 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2249 cert = self.sslconn.getpeercert()
2250 if support.verbose and self.server.chatty:
2251 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2252 cert_binary = self.sslconn.getpeercert(True)
2253 if support.verbose and self.server.chatty:
2254 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2255 cipher = self.sslconn.cipher()
2256 if support.verbose and self.server.chatty:
2257 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2258 sys.stdout.write(" server: selected protocol is now "
2259 + str(self.sslconn.selected_npn_protocol()) + "\n")
2260 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002261
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002262 def read(self):
2263 if self.sslconn:
2264 return self.sslconn.read()
2265 else:
2266 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002267
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002268 def write(self, bytes):
2269 if self.sslconn:
2270 return self.sslconn.write(bytes)
2271 else:
2272 return self.sock.send(bytes)
2273
2274 def close(self):
2275 if self.sslconn:
2276 self.sslconn.close()
2277 else:
2278 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002279
Antoine Pitrou480a1242010-04-28 21:37:09 +00002280 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002281 self.running = True
2282 if not self.server.starttls_server:
2283 if not self.wrap_conn():
2284 return
2285 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002286 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002287 msg = self.read()
2288 stripped = msg.strip()
2289 if not stripped:
2290 # eof, so quit this handler
2291 self.running = False
2292 try:
2293 self.sock = self.sslconn.unwrap()
2294 except OSError:
2295 # Many tests shut the TCP connection down
2296 # without an SSL shutdown. This causes
2297 # unwrap() to raise OSError with errno=0!
2298 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002299 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002300 self.sslconn = None
2301 self.close()
2302 elif stripped == b'over':
2303 if support.verbose and self.server.connectionchatty:
2304 sys.stdout.write(" server: client closed connection\n")
2305 self.close()
2306 return
2307 elif (self.server.starttls_server and
2308 stripped == b'STARTTLS'):
2309 if support.verbose and self.server.connectionchatty:
2310 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2311 self.write(b"OK\n")
2312 if not self.wrap_conn():
2313 return
2314 elif (self.server.starttls_server and self.sslconn
2315 and stripped == b'ENDTLS'):
2316 if support.verbose and self.server.connectionchatty:
2317 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2318 self.write(b"OK\n")
2319 self.sock = self.sslconn.unwrap()
2320 self.sslconn = None
2321 if support.verbose and self.server.connectionchatty:
2322 sys.stdout.write(" server: connection is now unencrypted...\n")
2323 elif stripped == b'CB tls-unique':
2324 if support.verbose and self.server.connectionchatty:
2325 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2326 data = self.sslconn.get_channel_binding("tls-unique")
2327 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002328 elif stripped == b'PHA':
2329 if support.verbose and self.server.connectionchatty:
2330 sys.stdout.write(" server: initiating post handshake auth\n")
2331 try:
2332 self.sslconn.verify_client_post_handshake()
2333 except ssl.SSLError as e:
2334 self.write(repr(e).encode("us-ascii") + b"\n")
2335 else:
2336 self.write(b"OK\n")
2337 elif stripped == b'HASCERT':
2338 if self.sslconn.getpeercert() is not None:
2339 self.write(b'TRUE\n')
2340 else:
2341 self.write(b'FALSE\n')
2342 elif stripped == b'GETCERT':
2343 cert = self.sslconn.getpeercert()
2344 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002345 else:
2346 if (support.verbose and
2347 self.server.connectionchatty):
2348 ctype = (self.sslconn and "encrypted") or "unencrypted"
2349 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2350 % (msg, ctype, msg.lower(), ctype))
2351 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002352 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002353 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2354 # when connection is not shut down gracefully.
2355 if self.server.chatty and support.verbose:
2356 sys.stdout.write(
2357 " Connection reset by peer: {}\n".format(
2358 self.addr)
2359 )
2360 self.close()
2361 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002362 except ssl.SSLError as err:
2363 # On Windows sometimes test_pha_required_nocert receives the
2364 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2365 # before the 'tlsv13 alert certificate required' exception.
2366 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2367 # is received test_pha_required_nocert fails with ConnectionResetError
2368 # because the underlying socket is closed
2369 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2370 if self.server.chatty and support.verbose:
2371 sys.stdout.write(err.args[1])
2372 # test_pha_required_nocert is expecting this exception
2373 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002374 except OSError:
2375 if self.server.chatty:
2376 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002377 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002378 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002379
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002380 # normally, we'd just stop here, but for the test
2381 # harness, we want to stop the server
2382 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002383
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002384 def __init__(self, certificate=None, ssl_version=None,
2385 certreqs=None, cacerts=None,
2386 chatty=True, connectionchatty=False, starttls_server=False,
2387 npn_protocols=None, alpn_protocols=None,
2388 ciphers=None, context=None):
2389 if context:
2390 self.context = context
2391 else:
2392 self.context = ssl.SSLContext(ssl_version
2393 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002394 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002395 self.context.verify_mode = (certreqs if certreqs is not None
2396 else ssl.CERT_NONE)
2397 if cacerts:
2398 self.context.load_verify_locations(cacerts)
2399 if certificate:
2400 self.context.load_cert_chain(certificate)
2401 if npn_protocols:
2402 self.context.set_npn_protocols(npn_protocols)
2403 if alpn_protocols:
2404 self.context.set_alpn_protocols(alpn_protocols)
2405 if ciphers:
2406 self.context.set_ciphers(ciphers)
2407 self.chatty = chatty
2408 self.connectionchatty = connectionchatty
2409 self.starttls_server = starttls_server
2410 self.sock = socket.socket()
2411 self.port = support.bind_port(self.sock)
2412 self.flag = None
2413 self.active = False
2414 self.selected_npn_protocols = []
2415 self.selected_alpn_protocols = []
2416 self.shared_ciphers = []
2417 self.conn_errors = []
2418 threading.Thread.__init__(self)
2419 self.daemon = True
2420
2421 def __enter__(self):
2422 self.start(threading.Event())
2423 self.flag.wait()
2424 return self
2425
2426 def __exit__(self, *args):
2427 self.stop()
2428 self.join()
2429
2430 def start(self, flag=None):
2431 self.flag = flag
2432 threading.Thread.start(self)
2433
2434 def run(self):
2435 self.sock.settimeout(0.05)
2436 self.sock.listen()
2437 self.active = True
2438 if self.flag:
2439 # signal an event
2440 self.flag.set()
2441 while self.active:
2442 try:
2443 newconn, connaddr = self.sock.accept()
2444 if support.verbose and self.chatty:
2445 sys.stdout.write(' server: new connection from '
2446 + repr(connaddr) + '\n')
2447 handler = self.ConnectionHandler(self, newconn, connaddr)
2448 handler.start()
2449 handler.join()
2450 except socket.timeout:
2451 pass
2452 except KeyboardInterrupt:
2453 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002454 except BaseException as e:
2455 if support.verbose and self.chatty:
2456 sys.stdout.write(
2457 ' connection handling failed: ' + repr(e) + '\n')
2458
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002459 self.sock.close()
2460
2461 def stop(self):
2462 self.active = False
2463
2464class AsyncoreEchoServer(threading.Thread):
2465
2466 # this one's based on asyncore.dispatcher
2467
2468 class EchoServer (asyncore.dispatcher):
2469
2470 class ConnectionHandler(asyncore.dispatcher_with_send):
2471
2472 def __init__(self, conn, certfile):
2473 self.socket = test_wrap_socket(conn, server_side=True,
2474 certfile=certfile,
2475 do_handshake_on_connect=False)
2476 asyncore.dispatcher_with_send.__init__(self, self.socket)
2477 self._ssl_accepting = True
2478 self._do_ssl_handshake()
2479
2480 def readable(self):
2481 if isinstance(self.socket, ssl.SSLSocket):
2482 while self.socket.pending() > 0:
2483 self.handle_read_event()
2484 return True
2485
2486 def _do_ssl_handshake(self):
2487 try:
2488 self.socket.do_handshake()
2489 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2490 return
2491 except ssl.SSLEOFError:
2492 return self.handle_close()
2493 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002494 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002495 except OSError as err:
2496 if err.args[0] == errno.ECONNABORTED:
2497 return self.handle_close()
2498 else:
2499 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002500
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002501 def handle_read(self):
2502 if self._ssl_accepting:
2503 self._do_ssl_handshake()
2504 else:
2505 data = self.recv(1024)
2506 if support.verbose:
2507 sys.stdout.write(" server: read %s from client\n" % repr(data))
2508 if not data:
2509 self.close()
2510 else:
2511 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002512
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002513 def handle_close(self):
2514 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002515 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002516 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002517
2518 def handle_error(self):
2519 raise
2520
Trent Nelson78520002008-04-10 20:54:35 +00002521 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002522 self.certfile = certfile
2523 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2524 self.port = support.bind_port(sock, '')
2525 asyncore.dispatcher.__init__(self, sock)
2526 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002527
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002528 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002529 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002530 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2531 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002532
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002533 def handle_error(self):
2534 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002535
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 def __init__(self, certfile):
2537 self.flag = None
2538 self.active = False
2539 self.server = self.EchoServer(certfile)
2540 self.port = self.server.port
2541 threading.Thread.__init__(self)
2542 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002543
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002544 def __str__(self):
2545 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002546
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002547 def __enter__(self):
2548 self.start(threading.Event())
2549 self.flag.wait()
2550 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002551
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002552 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002553 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002554 sys.stdout.write(" cleanup: stopping server.\n")
2555 self.stop()
2556 if support.verbose:
2557 sys.stdout.write(" cleanup: joining server thread.\n")
2558 self.join()
2559 if support.verbose:
2560 sys.stdout.write(" cleanup: successfully joined.\n")
2561 # make sure that ConnectionHandler is removed from socket_map
2562 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002563
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002564 def start (self, flag=None):
2565 self.flag = flag
2566 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002567
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002568 def run(self):
2569 self.active = True
2570 if self.flag:
2571 self.flag.set()
2572 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002573 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002574 asyncore.loop(1)
2575 except:
2576 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002577
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002578 def stop(self):
2579 self.active = False
2580 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002581
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002582def server_params_test(client_context, server_context, indata=b"FOO\n",
2583 chatty=True, connectionchatty=False, sni_name=None,
2584 session=None):
2585 """
2586 Launch a server, connect a client to it and try various reads
2587 and writes.
2588 """
2589 stats = {}
2590 server = ThreadedEchoServer(context=server_context,
2591 chatty=chatty,
2592 connectionchatty=False)
2593 with server:
2594 with client_context.wrap_socket(socket.socket(),
2595 server_hostname=sni_name, session=session) as s:
2596 s.connect((HOST, server.port))
2597 for arg in [indata, bytearray(indata), memoryview(indata)]:
2598 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002599 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002600 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002601 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002603 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002604 if connectionchatty:
2605 if support.verbose:
2606 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002607 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002608 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002609 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2610 % (outdata[:20], len(outdata),
2611 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002612 s.write(b"over\n")
2613 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002614 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002615 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002616 stats.update({
2617 'compression': s.compression(),
2618 'cipher': s.cipher(),
2619 'peercert': s.getpeercert(),
2620 'client_alpn_protocol': s.selected_alpn_protocol(),
2621 'client_npn_protocol': s.selected_npn_protocol(),
2622 'version': s.version(),
2623 'session_reused': s.session_reused,
2624 'session': s.session,
2625 })
2626 s.close()
2627 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2628 stats['server_npn_protocols'] = server.selected_npn_protocols
2629 stats['server_shared_ciphers'] = server.shared_ciphers
2630 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002631
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002632def try_protocol_combo(server_protocol, client_protocol, expect_success,
2633 certsreqs=None, server_options=0, client_options=0):
2634 """
2635 Try to SSL-connect using *client_protocol* to *server_protocol*.
2636 If *expect_success* is true, assert that the connection succeeds,
2637 if it's false, assert that the connection fails.
2638 Also, if *expect_success* is a string, assert that it is the protocol
2639 version actually used by the connection.
2640 """
2641 if certsreqs is None:
2642 certsreqs = ssl.CERT_NONE
2643 certtype = {
2644 ssl.CERT_NONE: "CERT_NONE",
2645 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2646 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2647 }[certsreqs]
2648 if support.verbose:
2649 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2650 sys.stdout.write(formatstr %
2651 (ssl.get_protocol_name(client_protocol),
2652 ssl.get_protocol_name(server_protocol),
2653 certtype))
2654 client_context = ssl.SSLContext(client_protocol)
2655 client_context.options |= client_options
2656 server_context = ssl.SSLContext(server_protocol)
2657 server_context.options |= server_options
2658
Victor Stinner3ef63442019-02-19 18:06:03 +01002659 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2660 if (min_version is not None
2661 # SSLContext.minimum_version is only available on recent OpenSSL
2662 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2663 and hasattr(server_context, 'minimum_version')
2664 and server_protocol == ssl.PROTOCOL_TLS
2665 and server_context.minimum_version > min_version):
2666 # If OpenSSL configuration is strict and requires more recent TLS
2667 # version, we have to change the minimum to test old TLS versions.
2668 server_context.minimum_version = min_version
2669
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002670 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2671 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2672 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002673 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002674 client_context.set_ciphers("ALL")
2675
2676 for ctx in (client_context, server_context):
2677 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002678 ctx.load_cert_chain(SIGNED_CERTFILE)
2679 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002680 try:
2681 stats = server_params_test(client_context, server_context,
2682 chatty=False, connectionchatty=False)
2683 # Protocol mismatch can result in either an SSLError, or a
2684 # "Connection reset by peer" error.
2685 except ssl.SSLError:
2686 if expect_success:
2687 raise
2688 except OSError as e:
2689 if expect_success or e.errno != errno.ECONNRESET:
2690 raise
2691 else:
2692 if not expect_success:
2693 raise AssertionError(
2694 "Client protocol %s succeeded with server protocol %s!"
2695 % (ssl.get_protocol_name(client_protocol),
2696 ssl.get_protocol_name(server_protocol)))
2697 elif (expect_success is not True
2698 and expect_success != stats['version']):
2699 raise AssertionError("version mismatch: expected %r, got %r"
2700 % (expect_success, stats['version']))
2701
2702
2703class ThreadedTests(unittest.TestCase):
2704
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705 def test_echo(self):
2706 """Basic test of an SSL client connecting to a server"""
2707 if support.verbose:
2708 sys.stdout.write("\n")
2709 for protocol in PROTOCOLS:
2710 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2711 continue
2712 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2713 context = ssl.SSLContext(protocol)
2714 context.load_cert_chain(CERTFILE)
2715 server_params_test(context, context,
2716 chatty=True, connectionchatty=True)
2717
Christian Heimesa170fa12017-09-15 20:27:30 +02002718 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002719
2720 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2721 server_params_test(client_context=client_context,
2722 server_context=server_context,
2723 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002724 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002725
2726 client_context.check_hostname = False
2727 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2728 with self.assertRaises(ssl.SSLError) as e:
2729 server_params_test(client_context=server_context,
2730 server_context=client_context,
2731 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002732 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002733 self.assertIn('called a function you should not call',
2734 str(e.exception))
2735
2736 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2737 with self.assertRaises(ssl.SSLError) as e:
2738 server_params_test(client_context=server_context,
2739 server_context=server_context,
2740 chatty=True, connectionchatty=True)
2741 self.assertIn('called a function you should not call',
2742 str(e.exception))
2743
2744 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2745 with self.assertRaises(ssl.SSLError) as e:
2746 server_params_test(client_context=server_context,
2747 server_context=client_context,
2748 chatty=True, connectionchatty=True)
2749 self.assertIn('called a function you should not call',
2750 str(e.exception))
2751
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002752 def test_getpeercert(self):
2753 if support.verbose:
2754 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002755
2756 client_context, server_context, hostname = testing_context()
2757 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002758 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002759 with client_context.wrap_socket(socket.socket(),
2760 do_handshake_on_connect=False,
2761 server_hostname=hostname) as s:
2762 s.connect((HOST, server.port))
2763 # getpeercert() raise ValueError while the handshake isn't
2764 # done.
2765 with self.assertRaises(ValueError):
2766 s.getpeercert()
2767 s.do_handshake()
2768 cert = s.getpeercert()
2769 self.assertTrue(cert, "Can't get peer certificate.")
2770 cipher = s.cipher()
2771 if support.verbose:
2772 sys.stdout.write(pprint.pformat(cert) + '\n')
2773 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2774 if 'subject' not in cert:
2775 self.fail("No subject field in certificate: %s." %
2776 pprint.pformat(cert))
2777 if ((('organizationName', 'Python Software Foundation'),)
2778 not in cert['subject']):
2779 self.fail(
2780 "Missing or invalid 'organizationName' field in certificate subject; "
2781 "should be 'Python Software Foundation'.")
2782 self.assertIn('notBefore', cert)
2783 self.assertIn('notAfter', cert)
2784 before = ssl.cert_time_to_seconds(cert['notBefore'])
2785 after = ssl.cert_time_to_seconds(cert['notAfter'])
2786 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002787
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002788 @unittest.skipUnless(have_verify_flags(),
2789 "verify_flags need OpenSSL > 0.9.8")
2790 def test_crl_check(self):
2791 if support.verbose:
2792 sys.stdout.write("\n")
2793
Christian Heimesa170fa12017-09-15 20:27:30 +02002794 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002795
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002796 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002797 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002798
2799 # VERIFY_DEFAULT should pass
2800 server = ThreadedEchoServer(context=server_context, chatty=True)
2801 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002802 with client_context.wrap_socket(socket.socket(),
2803 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002804 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002805 cert = s.getpeercert()
2806 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002807
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002808 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002809 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002810
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002811 server = ThreadedEchoServer(context=server_context, chatty=True)
2812 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002813 with client_context.wrap_socket(socket.socket(),
2814 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002815 with self.assertRaisesRegex(ssl.SSLError,
2816 "certificate verify failed"):
2817 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002818
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002819 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002820 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002821
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002822 server = ThreadedEchoServer(context=server_context, chatty=True)
2823 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002824 with client_context.wrap_socket(socket.socket(),
2825 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002826 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002827 cert = s.getpeercert()
2828 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002829
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002830 def test_check_hostname(self):
2831 if support.verbose:
2832 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002833
Christian Heimesa170fa12017-09-15 20:27:30 +02002834 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002835
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002836 # correct hostname should verify
2837 server = ThreadedEchoServer(context=server_context, chatty=True)
2838 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002839 with client_context.wrap_socket(socket.socket(),
2840 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002841 s.connect((HOST, server.port))
2842 cert = s.getpeercert()
2843 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002844
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002845 # incorrect hostname should raise an exception
2846 server = ThreadedEchoServer(context=server_context, chatty=True)
2847 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002848 with client_context.wrap_socket(socket.socket(),
2849 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002850 with self.assertRaisesRegex(
2851 ssl.CertificateError,
2852 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002853 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002854
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002855 # missing server_hostname arg should cause an exception, too
2856 server = ThreadedEchoServer(context=server_context, chatty=True)
2857 with server:
2858 with socket.socket() as s:
2859 with self.assertRaisesRegex(ValueError,
2860 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002861 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002862
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002863 def test_ecc_cert(self):
2864 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2865 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002866 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002867 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2868
2869 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2870 # load ECC cert
2871 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2872
2873 # correct hostname should verify
2874 server = ThreadedEchoServer(context=server_context, chatty=True)
2875 with server:
2876 with client_context.wrap_socket(socket.socket(),
2877 server_hostname=hostname) as s:
2878 s.connect((HOST, server.port))
2879 cert = s.getpeercert()
2880 self.assertTrue(cert, "Can't get peer certificate.")
2881 cipher = s.cipher()[0].split('-')
2882 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2883
2884 def test_dual_rsa_ecc(self):
2885 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2886 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002887 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2888 # algorithms.
2889 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002890 # only ECDSA certs
2891 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2892 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2893
2894 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2895 # load ECC and RSA key/cert pairs
2896 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2897 server_context.load_cert_chain(SIGNED_CERTFILE)
2898
2899 # correct hostname should verify
2900 server = ThreadedEchoServer(context=server_context, chatty=True)
2901 with server:
2902 with client_context.wrap_socket(socket.socket(),
2903 server_hostname=hostname) as s:
2904 s.connect((HOST, server.port))
2905 cert = s.getpeercert()
2906 self.assertTrue(cert, "Can't get peer certificate.")
2907 cipher = s.cipher()[0].split('-')
2908 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2909
Christian Heimes66e57422018-01-29 14:25:13 +01002910 def test_check_hostname_idn(self):
2911 if support.verbose:
2912 sys.stdout.write("\n")
2913
Christian Heimes11a14932018-02-24 02:35:08 +01002914 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002915 server_context.load_cert_chain(IDNSANSFILE)
2916
Christian Heimes11a14932018-02-24 02:35:08 +01002917 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002918 context.verify_mode = ssl.CERT_REQUIRED
2919 context.check_hostname = True
2920 context.load_verify_locations(SIGNING_CA)
2921
2922 # correct hostname should verify, when specified in several
2923 # different ways
2924 idn_hostnames = [
2925 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002926 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002927 ('xn--knig-5qa.idn.pythontest.net',
2928 'xn--knig-5qa.idn.pythontest.net'),
2929 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002930 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002931
2932 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002933 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002934 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2935 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2936 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002937 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2938
2939 # ('königsgäßchen.idna2008.pythontest.net',
2940 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2941 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2942 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2943 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2944 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2945
Christian Heimes66e57422018-01-29 14:25:13 +01002946 ]
2947 for server_hostname, expected_hostname in idn_hostnames:
2948 server = ThreadedEchoServer(context=server_context, chatty=True)
2949 with server:
2950 with context.wrap_socket(socket.socket(),
2951 server_hostname=server_hostname) as s:
2952 self.assertEqual(s.server_hostname, expected_hostname)
2953 s.connect((HOST, server.port))
2954 cert = s.getpeercert()
2955 self.assertEqual(s.server_hostname, expected_hostname)
2956 self.assertTrue(cert, "Can't get peer certificate.")
2957
Christian Heimes66e57422018-01-29 14:25:13 +01002958 # incorrect hostname should raise an exception
2959 server = ThreadedEchoServer(context=server_context, chatty=True)
2960 with server:
2961 with context.wrap_socket(socket.socket(),
2962 server_hostname="python.example.org") as s:
2963 with self.assertRaises(ssl.CertificateError):
2964 s.connect((HOST, server.port))
2965
Christian Heimes529525f2018-05-23 22:24:45 +02002966 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002967 """Connecting when the server rejects the client's certificate
2968
2969 Launch a server with CERT_REQUIRED, and check that trying to
2970 connect to it with a wrong client certificate fails.
2971 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002972 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002973 # load client cert that is not signed by trusted CA
2974 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002975 # require TLS client authentication
2976 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002977 # TLS 1.3 has different handshake
2978 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002979
2980 server = ThreadedEchoServer(
2981 context=server_context, chatty=True, connectionchatty=True,
2982 )
2983
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002984 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002985 client_context.wrap_socket(socket.socket(),
2986 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002987 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002988 # Expect either an SSL error about the server rejecting
2989 # the connection, or a low-level connection reset (which
2990 # sometimes happens on Windows)
2991 s.connect((HOST, server.port))
2992 except ssl.SSLError as e:
2993 if support.verbose:
2994 sys.stdout.write("\nSSLError is %r\n" % e)
2995 except OSError as e:
2996 if e.errno != errno.ECONNRESET:
2997 raise
2998 if support.verbose:
2999 sys.stdout.write("\nsocket.error is %r\n" % e)
3000 else:
3001 self.fail("Use of invalid cert should have failed!")
3002
Christian Heimes529525f2018-05-23 22:24:45 +02003003 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
3004 def test_wrong_cert_tls13(self):
3005 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003006 # load client cert that is not signed by trusted CA
3007 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003008 server_context.verify_mode = ssl.CERT_REQUIRED
3009 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3010 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3011
3012 server = ThreadedEchoServer(
3013 context=server_context, chatty=True, connectionchatty=True,
3014 )
3015 with server, \
3016 client_context.wrap_socket(socket.socket(),
3017 server_hostname=hostname) as s:
3018 # TLS 1.3 perform client cert exchange after handshake
3019 s.connect((HOST, server.port))
3020 try:
3021 s.write(b'data')
3022 s.read(4)
3023 except ssl.SSLError as e:
3024 if support.verbose:
3025 sys.stdout.write("\nSSLError is %r\n" % e)
3026 except OSError as e:
3027 if e.errno != errno.ECONNRESET:
3028 raise
3029 if support.verbose:
3030 sys.stdout.write("\nsocket.error is %r\n" % e)
3031 else:
3032 self.fail("Use of invalid cert should have failed!")
3033
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003034 def test_rude_shutdown(self):
3035 """A brutal shutdown of an SSL server should raise an OSError
3036 in the client when attempting handshake.
3037 """
3038 listener_ready = threading.Event()
3039 listener_gone = threading.Event()
3040
3041 s = socket.socket()
3042 port = support.bind_port(s, HOST)
3043
3044 # `listener` runs in a thread. It sits in an accept() until
3045 # the main thread connects. Then it rudely closes the socket,
3046 # and sets Event `listener_gone` to let the main thread know
3047 # the socket is gone.
3048 def listener():
3049 s.listen()
3050 listener_ready.set()
3051 newsock, addr = s.accept()
3052 newsock.close()
3053 s.close()
3054 listener_gone.set()
3055
3056 def connector():
3057 listener_ready.wait()
3058 with socket.socket() as c:
3059 c.connect((HOST, port))
3060 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003061 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003062 ssl_sock = test_wrap_socket(c)
3063 except OSError:
3064 pass
3065 else:
3066 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003068 t = threading.Thread(target=listener)
3069 t.start()
3070 try:
3071 connector()
3072 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003073 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003074
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003075 def test_ssl_cert_verify_error(self):
3076 if support.verbose:
3077 sys.stdout.write("\n")
3078
3079 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3080 server_context.load_cert_chain(SIGNED_CERTFILE)
3081
3082 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3083
3084 server = ThreadedEchoServer(context=server_context, chatty=True)
3085 with server:
3086 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003087 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003088 try:
3089 s.connect((HOST, server.port))
3090 except ssl.SSLError as e:
3091 msg = 'unable to get local issuer certificate'
3092 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3093 self.assertEqual(e.verify_code, 20)
3094 self.assertEqual(e.verify_message, msg)
3095 self.assertIn(msg, repr(e))
3096 self.assertIn('certificate verify failed', repr(e))
3097
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003098 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3099 "OpenSSL is compiled without SSLv2 support")
3100 def test_protocol_sslv2(self):
3101 """Connecting to an SSLv2 server with various client options"""
3102 if support.verbose:
3103 sys.stdout.write("\n")
3104 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3105 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3106 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003107 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003108 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3109 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3110 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3111 # SSLv23 client with specific SSL options
3112 if no_sslv2_implies_sslv3_hello():
3113 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003114 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003115 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003116 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003117 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003118 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003119 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003120
Christian Heimesa170fa12017-09-15 20:27:30 +02003121 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003122 """Connecting to an SSLv23 server with various client options"""
3123 if support.verbose:
3124 sys.stdout.write("\n")
3125 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003126 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003127 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003128 except OSError as x:
3129 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3130 if support.verbose:
3131 sys.stdout.write(
3132 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3133 % str(x))
3134 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003135 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3136 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3137 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003138
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003139 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003140 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3141 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3142 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003143
3144 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003145 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3146 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3147 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003148
3149 # Server with specific SSL options
3150 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003151 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003152 server_options=ssl.OP_NO_SSLv3)
3153 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003154 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003155 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003156 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003157 server_options=ssl.OP_NO_TLSv1)
3158
3159
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003160 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3161 "OpenSSL is compiled without SSLv3 support")
3162 def test_protocol_sslv3(self):
3163 """Connecting to an SSLv3 server with various client options"""
3164 if support.verbose:
3165 sys.stdout.write("\n")
3166 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3167 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3168 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3169 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3170 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003171 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003172 client_options=ssl.OP_NO_SSLv3)
3173 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3174 if no_sslv2_implies_sslv3_hello():
3175 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003176 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003177 False, client_options=ssl.OP_NO_SSLv2)
3178
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003179 def test_protocol_tlsv1(self):
3180 """Connecting to a TLSv1 server with various client options"""
3181 if support.verbose:
3182 sys.stdout.write("\n")
3183 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3184 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3185 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3187 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3188 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3189 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003190 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003191 client_options=ssl.OP_NO_TLSv1)
3192
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003193 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3194 "TLS version 1.1 not supported.")
3195 def test_protocol_tlsv1_1(self):
3196 """Connecting to a TLSv1.1 server with various client options.
3197 Testing against older TLS versions."""
3198 if support.verbose:
3199 sys.stdout.write("\n")
3200 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3201 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3202 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3203 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3204 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003205 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003206 client_options=ssl.OP_NO_TLSv1_1)
3207
Christian Heimesa170fa12017-09-15 20:27:30 +02003208 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3210 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3211
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003212 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3213 "TLS version 1.2 not supported.")
3214 def test_protocol_tlsv1_2(self):
3215 """Connecting to a TLSv1.2 server with various client options.
3216 Testing against older TLS versions."""
3217 if support.verbose:
3218 sys.stdout.write("\n")
3219 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3220 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3221 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3222 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3223 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3224 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3225 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003226 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003227 client_options=ssl.OP_NO_TLSv1_2)
3228
Christian Heimesa170fa12017-09-15 20:27:30 +02003229 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003230 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3231 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3232 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3233 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3234
3235 def test_starttls(self):
3236 """Switching from clear text to encrypted and back again."""
3237 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3238
3239 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003240 starttls_server=True,
3241 chatty=True,
3242 connectionchatty=True)
3243 wrapped = False
3244 with server:
3245 s = socket.socket()
3246 s.setblocking(1)
3247 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003248 if support.verbose:
3249 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003250 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003251 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003252 sys.stdout.write(
3253 " client: sending %r...\n" % indata)
3254 if wrapped:
3255 conn.write(indata)
3256 outdata = conn.read()
3257 else:
3258 s.send(indata)
3259 outdata = s.recv(1024)
3260 msg = outdata.strip().lower()
3261 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3262 # STARTTLS ok, switch to secure mode
3263 if support.verbose:
3264 sys.stdout.write(
3265 " client: read %r from server, starting TLS...\n"
3266 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003267 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003268 wrapped = True
3269 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3270 # ENDTLS ok, switch back to clear text
3271 if support.verbose:
3272 sys.stdout.write(
3273 " client: read %r from server, ending TLS...\n"
3274 % msg)
3275 s = conn.unwrap()
3276 wrapped = False
3277 else:
3278 if support.verbose:
3279 sys.stdout.write(
3280 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003281 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003282 sys.stdout.write(" client: closing connection.\n")
3283 if wrapped:
3284 conn.write(b"over\n")
3285 else:
3286 s.send(b"over\n")
3287 if wrapped:
3288 conn.close()
3289 else:
3290 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003291
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003292 def test_socketserver(self):
3293 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003294 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003295 # try to connect
3296 if support.verbose:
3297 sys.stdout.write('\n')
3298 with open(CERTFILE, 'rb') as f:
3299 d1 = f.read()
3300 d2 = ''
3301 # now fetch the same data from the HTTPS server
3302 url = 'https://localhost:%d/%s' % (
3303 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003304 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003305 f = urllib.request.urlopen(url, context=context)
3306 try:
3307 dlen = f.info().get("content-length")
3308 if dlen and (int(dlen) > 0):
3309 d2 = f.read(int(dlen))
3310 if support.verbose:
3311 sys.stdout.write(
3312 " client: read %d bytes from remote server '%s'\n"
3313 % (len(d2), server))
3314 finally:
3315 f.close()
3316 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003317
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003318 def test_asyncore_server(self):
3319 """Check the example asyncore integration."""
3320 if support.verbose:
3321 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003322
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003323 indata = b"FOO\n"
3324 server = AsyncoreEchoServer(CERTFILE)
3325 with server:
3326 s = test_wrap_socket(socket.socket())
3327 s.connect(('127.0.0.1', server.port))
3328 if support.verbose:
3329 sys.stdout.write(
3330 " client: sending %r...\n" % indata)
3331 s.write(indata)
3332 outdata = s.read()
3333 if support.verbose:
3334 sys.stdout.write(" client: read %r\n" % outdata)
3335 if outdata != indata.lower():
3336 self.fail(
3337 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3338 % (outdata[:20], len(outdata),
3339 indata[:20].lower(), len(indata)))
3340 s.write(b"over\n")
3341 if support.verbose:
3342 sys.stdout.write(" client: closing connection.\n")
3343 s.close()
3344 if support.verbose:
3345 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003346
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003347 def test_recv_send(self):
3348 """Test recv(), send() and friends."""
3349 if support.verbose:
3350 sys.stdout.write("\n")
3351
3352 server = ThreadedEchoServer(CERTFILE,
3353 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003354 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003355 cacerts=CERTFILE,
3356 chatty=True,
3357 connectionchatty=False)
3358 with server:
3359 s = test_wrap_socket(socket.socket(),
3360 server_side=False,
3361 certfile=CERTFILE,
3362 ca_certs=CERTFILE,
3363 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003364 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003365 s.connect((HOST, server.port))
3366 # helper methods for standardising recv* method signatures
3367 def _recv_into():
3368 b = bytearray(b"\0"*100)
3369 count = s.recv_into(b)
3370 return b[:count]
3371
3372 def _recvfrom_into():
3373 b = bytearray(b"\0"*100)
3374 count, addr = s.recvfrom_into(b)
3375 return b[:count]
3376
3377 # (name, method, expect success?, *args, return value func)
3378 send_methods = [
3379 ('send', s.send, True, [], len),
3380 ('sendto', s.sendto, False, ["some.address"], len),
3381 ('sendall', s.sendall, True, [], lambda x: None),
3382 ]
3383 # (name, method, whether to expect success, *args)
3384 recv_methods = [
3385 ('recv', s.recv, True, []),
3386 ('recvfrom', s.recvfrom, False, ["some.address"]),
3387 ('recv_into', _recv_into, True, []),
3388 ('recvfrom_into', _recvfrom_into, False, []),
3389 ]
3390 data_prefix = "PREFIX_"
3391
3392 for (meth_name, send_meth, expect_success, args,
3393 ret_val_meth) in send_methods:
3394 indata = (data_prefix + meth_name).encode('ascii')
3395 try:
3396 ret = send_meth(indata, *args)
3397 msg = "sending with {}".format(meth_name)
3398 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3399 outdata = s.read()
3400 if outdata != indata.lower():
3401 self.fail(
3402 "While sending with <<{name:s}>> bad data "
3403 "<<{outdata:r}>> ({nout:d}) received; "
3404 "expected <<{indata:r}>> ({nin:d})\n".format(
3405 name=meth_name, outdata=outdata[:20],
3406 nout=len(outdata),
3407 indata=indata[:20], nin=len(indata)
3408 )
3409 )
3410 except ValueError as e:
3411 if expect_success:
3412 self.fail(
3413 "Failed to send with method <<{name:s}>>; "
3414 "expected to succeed.\n".format(name=meth_name)
3415 )
3416 if not str(e).startswith(meth_name):
3417 self.fail(
3418 "Method <<{name:s}>> failed with unexpected "
3419 "exception message: {exp:s}\n".format(
3420 name=meth_name, exp=e
3421 )
3422 )
3423
3424 for meth_name, recv_meth, expect_success, args in recv_methods:
3425 indata = (data_prefix + meth_name).encode('ascii')
3426 try:
3427 s.send(indata)
3428 outdata = recv_meth(*args)
3429 if outdata != indata.lower():
3430 self.fail(
3431 "While receiving with <<{name:s}>> bad data "
3432 "<<{outdata:r}>> ({nout:d}) received; "
3433 "expected <<{indata:r}>> ({nin:d})\n".format(
3434 name=meth_name, outdata=outdata[:20],
3435 nout=len(outdata),
3436 indata=indata[:20], nin=len(indata)
3437 )
3438 )
3439 except ValueError as e:
3440 if expect_success:
3441 self.fail(
3442 "Failed to receive with method <<{name:s}>>; "
3443 "expected to succeed.\n".format(name=meth_name)
3444 )
3445 if not str(e).startswith(meth_name):
3446 self.fail(
3447 "Method <<{name:s}>> failed with unexpected "
3448 "exception message: {exp:s}\n".format(
3449 name=meth_name, exp=e
3450 )
3451 )
3452 # consume data
3453 s.read()
3454
3455 # read(-1, buffer) is supported, even though read(-1) is not
3456 data = b"data"
3457 s.send(data)
3458 buffer = bytearray(len(data))
3459 self.assertEqual(s.read(-1, buffer), len(data))
3460 self.assertEqual(buffer, data)
3461
Christian Heimes888bbdc2017-09-07 14:18:21 -07003462 # sendall accepts bytes-like objects
3463 if ctypes is not None:
3464 ubyte = ctypes.c_ubyte * len(data)
3465 byteslike = ubyte.from_buffer_copy(data)
3466 s.sendall(byteslike)
3467 self.assertEqual(s.read(), data)
3468
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003469 # Make sure sendmsg et al are disallowed to avoid
3470 # inadvertent disclosure of data and/or corruption
3471 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003472 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003473 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3474 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3475 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003476 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003477 s.write(b"over\n")
3478
3479 self.assertRaises(ValueError, s.recv, -1)
3480 self.assertRaises(ValueError, s.read, -1)
3481
3482 s.close()
3483
3484 def test_recv_zero(self):
3485 server = ThreadedEchoServer(CERTFILE)
3486 server.__enter__()
3487 self.addCleanup(server.__exit__, None, None)
3488 s = socket.create_connection((HOST, server.port))
3489 self.addCleanup(s.close)
3490 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3491 self.addCleanup(s.close)
3492
3493 # recv/read(0) should return no data
3494 s.send(b"data")
3495 self.assertEqual(s.recv(0), b"")
3496 self.assertEqual(s.read(0), b"")
3497 self.assertEqual(s.read(), b"data")
3498
3499 # Should not block if the other end sends no data
3500 s.setblocking(False)
3501 self.assertEqual(s.recv(0), b"")
3502 self.assertEqual(s.recv_into(bytearray()), 0)
3503
3504 def test_nonblocking_send(self):
3505 server = ThreadedEchoServer(CERTFILE,
3506 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003507 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003508 cacerts=CERTFILE,
3509 chatty=True,
3510 connectionchatty=False)
3511 with server:
3512 s = test_wrap_socket(socket.socket(),
3513 server_side=False,
3514 certfile=CERTFILE,
3515 ca_certs=CERTFILE,
3516 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003517 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003518 s.connect((HOST, server.port))
3519 s.setblocking(False)
3520
3521 # If we keep sending data, at some point the buffers
3522 # will be full and the call will block
3523 buf = bytearray(8192)
3524 def fill_buffer():
3525 while True:
3526 s.send(buf)
3527 self.assertRaises((ssl.SSLWantWriteError,
3528 ssl.SSLWantReadError), fill_buffer)
3529
3530 # Now read all the output and discard it
3531 s.setblocking(True)
3532 s.close()
3533
3534 def test_handshake_timeout(self):
3535 # Issue #5103: SSL handshake must respect the socket timeout
3536 server = socket.socket(socket.AF_INET)
3537 host = "127.0.0.1"
3538 port = support.bind_port(server)
3539 started = threading.Event()
3540 finish = False
3541
3542 def serve():
3543 server.listen()
3544 started.set()
3545 conns = []
3546 while not finish:
3547 r, w, e = select.select([server], [], [], 0.1)
3548 if server in r:
3549 # Let the socket hang around rather than having
3550 # it closed by garbage collection.
3551 conns.append(server.accept()[0])
3552 for sock in conns:
3553 sock.close()
3554
3555 t = threading.Thread(target=serve)
3556 t.start()
3557 started.wait()
3558
3559 try:
3560 try:
3561 c = socket.socket(socket.AF_INET)
3562 c.settimeout(0.2)
3563 c.connect((host, port))
3564 # Will attempt handshake and time out
3565 self.assertRaisesRegex(socket.timeout, "timed out",
3566 test_wrap_socket, c)
3567 finally:
3568 c.close()
3569 try:
3570 c = socket.socket(socket.AF_INET)
3571 c = test_wrap_socket(c)
3572 c.settimeout(0.2)
3573 # Will attempt handshake and time out
3574 self.assertRaisesRegex(socket.timeout, "timed out",
3575 c.connect, (host, port))
3576 finally:
3577 c.close()
3578 finally:
3579 finish = True
3580 t.join()
3581 server.close()
3582
3583 def test_server_accept(self):
3584 # Issue #16357: accept() on a SSLSocket created through
3585 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003586 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003587 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003588 context.load_verify_locations(SIGNING_CA)
3589 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003590 server = socket.socket(socket.AF_INET)
3591 host = "127.0.0.1"
3592 port = support.bind_port(server)
3593 server = context.wrap_socket(server, server_side=True)
3594 self.assertTrue(server.server_side)
3595
3596 evt = threading.Event()
3597 remote = None
3598 peer = None
3599 def serve():
3600 nonlocal remote, peer
3601 server.listen()
3602 # Block on the accept and wait on the connection to close.
3603 evt.set()
3604 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003605 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003606
3607 t = threading.Thread(target=serve)
3608 t.start()
3609 # Client wait until server setup and perform a connect.
3610 evt.wait()
3611 client = context.wrap_socket(socket.socket())
3612 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003613 client.send(b'data')
3614 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003615 client_addr = client.getsockname()
3616 client.close()
3617 t.join()
3618 remote.close()
3619 server.close()
3620 # Sanity checks.
3621 self.assertIsInstance(remote, ssl.SSLSocket)
3622 self.assertEqual(peer, client_addr)
3623
3624 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003625 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003626 with context.wrap_socket(socket.socket()) as sock:
3627 with self.assertRaises(OSError) as cm:
3628 sock.getpeercert()
3629 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3630
3631 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003632 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003633 with context.wrap_socket(socket.socket()) as sock:
3634 with self.assertRaises(OSError) as cm:
3635 sock.do_handshake()
3636 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3637
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003638 def test_no_shared_ciphers(self):
3639 client_context, server_context, hostname = testing_context()
3640 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3641 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003642 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003643 client_context.set_ciphers("AES128")
3644 server_context.set_ciphers("AES256")
3645 with ThreadedEchoServer(context=server_context) as server:
3646 with client_context.wrap_socket(socket.socket(),
3647 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003648 with self.assertRaises(OSError):
3649 s.connect((HOST, server.port))
3650 self.assertIn("no shared cipher", server.conn_errors[0])
3651
3652 def test_version_basic(self):
3653 """
3654 Basic tests for SSLSocket.version().
3655 More tests are done in the test_protocol_*() methods.
3656 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003657 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3658 context.check_hostname = False
3659 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003660 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003661 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003662 chatty=False) as server:
3663 with context.wrap_socket(socket.socket()) as s:
3664 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003665 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003666 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003667 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003668 self.assertEqual(s.version(), 'TLSv1.3')
3669 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003670 self.assertEqual(s.version(), 'TLSv1.2')
3671 else: # 0.9.8 to 1.0.1
3672 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003673 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003674 self.assertIs(s.version(), None)
3675
Christian Heimescb5b68a2017-09-07 18:07:00 -07003676 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3677 "test requires TLSv1.3 enabled OpenSSL")
3678 def test_tls1_3(self):
3679 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3680 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003681 context.options |= (
3682 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3683 )
3684 with ThreadedEchoServer(context=context) as server:
3685 with context.wrap_socket(socket.socket()) as s:
3686 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003687 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003688 'TLS_AES_256_GCM_SHA384',
3689 'TLS_CHACHA20_POLY1305_SHA256',
3690 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003691 })
3692 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003693
Christian Heimes698dde12018-02-27 11:54:43 +01003694 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3695 "required OpenSSL 1.1.0g")
3696 def test_min_max_version(self):
3697 client_context, server_context, hostname = testing_context()
3698 # client TLSv1.0 to 1.2
3699 client_context.minimum_version = ssl.TLSVersion.TLSv1
3700 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3701 # server only TLSv1.2
3702 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3703 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3704
3705 with ThreadedEchoServer(context=server_context) as server:
3706 with client_context.wrap_socket(socket.socket(),
3707 server_hostname=hostname) as s:
3708 s.connect((HOST, server.port))
3709 self.assertEqual(s.version(), 'TLSv1.2')
3710
3711 # client 1.0 to 1.2, server 1.0 to 1.1
3712 server_context.minimum_version = ssl.TLSVersion.TLSv1
3713 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3714
3715 with ThreadedEchoServer(context=server_context) as server:
3716 with client_context.wrap_socket(socket.socket(),
3717 server_hostname=hostname) as s:
3718 s.connect((HOST, server.port))
3719 self.assertEqual(s.version(), 'TLSv1.1')
3720
3721 # client 1.0, server 1.2 (mismatch)
3722 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3723 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003724 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003725 client_context.maximum_version = ssl.TLSVersion.TLSv1
3726 with ThreadedEchoServer(context=server_context) as server:
3727 with client_context.wrap_socket(socket.socket(),
3728 server_hostname=hostname) as s:
3729 with self.assertRaises(ssl.SSLError) as e:
3730 s.connect((HOST, server.port))
3731 self.assertIn("alert", str(e.exception))
3732
3733
3734 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3735 "required OpenSSL 1.1.0g")
3736 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3737 def test_min_max_version_sslv3(self):
3738 client_context, server_context, hostname = testing_context()
3739 server_context.minimum_version = ssl.TLSVersion.SSLv3
3740 client_context.minimum_version = ssl.TLSVersion.SSLv3
3741 client_context.maximum_version = ssl.TLSVersion.SSLv3
3742 with ThreadedEchoServer(context=server_context) as server:
3743 with client_context.wrap_socket(socket.socket(),
3744 server_hostname=hostname) as s:
3745 s.connect((HOST, server.port))
3746 self.assertEqual(s.version(), 'SSLv3')
3747
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003748 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3749 def test_default_ecdh_curve(self):
3750 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3751 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003752 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003753 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003754 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3755 # cipher name.
3756 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003757 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3758 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3759 # our default cipher list should prefer ECDH-based ciphers
3760 # automatically.
3761 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3762 context.set_ciphers("ECCdraft:ECDH")
3763 with ThreadedEchoServer(context=context) as server:
3764 with context.wrap_socket(socket.socket()) as s:
3765 s.connect((HOST, server.port))
3766 self.assertIn("ECDH", s.cipher()[0])
3767
3768 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3769 "'tls-unique' channel binding not available")
3770 def test_tls_unique_channel_binding(self):
3771 """Test tls-unique channel binding."""
3772 if support.verbose:
3773 sys.stdout.write("\n")
3774
Christian Heimes05d9fe32018-02-27 08:55:39 +01003775 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003776
3777 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003778 chatty=True,
3779 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003780
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003781 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003782 with client_context.wrap_socket(
3783 socket.socket(),
3784 server_hostname=hostname) as s:
3785 s.connect((HOST, server.port))
3786 # get the data
3787 cb_data = s.get_channel_binding("tls-unique")
3788 if support.verbose:
3789 sys.stdout.write(
3790 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003791
Christian Heimes05d9fe32018-02-27 08:55:39 +01003792 # check if it is sane
3793 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003794 if s.version() == 'TLSv1.3':
3795 self.assertEqual(len(cb_data), 48)
3796 else:
3797 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003798
Christian Heimes05d9fe32018-02-27 08:55:39 +01003799 # and compare with the peers version
3800 s.write(b"CB tls-unique\n")
3801 peer_data_repr = s.read().strip()
3802 self.assertEqual(peer_data_repr,
3803 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003804
3805 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003806 with client_context.wrap_socket(
3807 socket.socket(),
3808 server_hostname=hostname) as s:
3809 s.connect((HOST, server.port))
3810 new_cb_data = s.get_channel_binding("tls-unique")
3811 if support.verbose:
3812 sys.stdout.write(
3813 "got another channel binding data: {0!r}\n".format(
3814 new_cb_data)
3815 )
3816 # is it really unique
3817 self.assertNotEqual(cb_data, new_cb_data)
3818 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003819 if s.version() == 'TLSv1.3':
3820 self.assertEqual(len(cb_data), 48)
3821 else:
3822 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003823 s.write(b"CB tls-unique\n")
3824 peer_data_repr = s.read().strip()
3825 self.assertEqual(peer_data_repr,
3826 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003827
3828 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003829 client_context, server_context, hostname = testing_context()
3830 stats = server_params_test(client_context, server_context,
3831 chatty=True, connectionchatty=True,
3832 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003833 if support.verbose:
3834 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3835 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3836
3837 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3838 "ssl.OP_NO_COMPRESSION needed for this test")
3839 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003840 client_context, server_context, hostname = testing_context()
3841 client_context.options |= ssl.OP_NO_COMPRESSION
3842 server_context.options |= ssl.OP_NO_COMPRESSION
3843 stats = server_params_test(client_context, server_context,
3844 chatty=True, connectionchatty=True,
3845 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003846 self.assertIs(stats['compression'], None)
3847
3848 def test_dh_params(self):
3849 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003850 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003851 # test scenario needs TLS <= 1.2
3852 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003853 server_context.load_dh_params(DHFILE)
3854 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003855 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003856 stats = server_params_test(client_context, server_context,
3857 chatty=True, connectionchatty=True,
3858 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003859 cipher = stats["cipher"][0]
3860 parts = cipher.split("-")
3861 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3862 self.fail("Non-DH cipher: " + cipher[0])
3863
Christian Heimesb7b92252018-02-25 09:49:31 +01003864 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003865 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003866 def test_ecdh_curve(self):
3867 # server secp384r1, client auto
3868 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003869
Christian Heimesb7b92252018-02-25 09:49:31 +01003870 server_context.set_ecdh_curve("secp384r1")
3871 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3872 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3873 stats = server_params_test(client_context, server_context,
3874 chatty=True, connectionchatty=True,
3875 sni_name=hostname)
3876
3877 # server auto, client secp384r1
3878 client_context, server_context, hostname = testing_context()
3879 client_context.set_ecdh_curve("secp384r1")
3880 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3881 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3882 stats = server_params_test(client_context, server_context,
3883 chatty=True, connectionchatty=True,
3884 sni_name=hostname)
3885
3886 # server / client curve mismatch
3887 client_context, server_context, hostname = testing_context()
3888 client_context.set_ecdh_curve("prime256v1")
3889 server_context.set_ecdh_curve("secp384r1")
3890 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3891 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3892 try:
3893 stats = server_params_test(client_context, server_context,
3894 chatty=True, connectionchatty=True,
3895 sni_name=hostname)
3896 except ssl.SSLError:
3897 pass
3898 else:
3899 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003900 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003901 self.fail("mismatch curve did not fail")
3902
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003903 def test_selected_alpn_protocol(self):
3904 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003905 client_context, server_context, hostname = testing_context()
3906 stats = server_params_test(client_context, server_context,
3907 chatty=True, connectionchatty=True,
3908 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003909 self.assertIs(stats['client_alpn_protocol'], None)
3910
3911 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3912 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3913 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003914 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003915 server_context.set_alpn_protocols(['foo', 'bar'])
3916 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003917 chatty=True, connectionchatty=True,
3918 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003919 self.assertIs(stats['client_alpn_protocol'], None)
3920
3921 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3922 def test_alpn_protocols(self):
3923 server_protocols = ['foo', 'bar', 'milkshake']
3924 protocol_tests = [
3925 (['foo', 'bar'], 'foo'),
3926 (['bar', 'foo'], 'foo'),
3927 (['milkshake'], 'milkshake'),
3928 (['http/3.0', 'http/4.0'], None)
3929 ]
3930 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003931 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003932 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003933 client_context.set_alpn_protocols(client_protocols)
3934
3935 try:
3936 stats = server_params_test(client_context,
3937 server_context,
3938 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003939 connectionchatty=True,
3940 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 except ssl.SSLError as e:
3942 stats = e
3943
Christian Heimes05d9fe32018-02-27 08:55:39 +01003944 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003945 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3946 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3947 self.assertIsInstance(stats, ssl.SSLError)
3948 else:
3949 msg = "failed trying %s (s) and %s (c).\n" \
3950 "was expecting %s, but got %%s from the %%s" \
3951 % (str(server_protocols), str(client_protocols),
3952 str(expected))
3953 client_result = stats['client_alpn_protocol']
3954 self.assertEqual(client_result, expected,
3955 msg % (client_result, "client"))
3956 server_result = stats['server_alpn_protocols'][-1] \
3957 if len(stats['server_alpn_protocols']) else 'nothing'
3958 self.assertEqual(server_result, expected,
3959 msg % (server_result, "server"))
3960
3961 def test_selected_npn_protocol(self):
3962 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003963 client_context, server_context, hostname = testing_context()
3964 stats = server_params_test(client_context, server_context,
3965 chatty=True, connectionchatty=True,
3966 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003967 self.assertIs(stats['client_npn_protocol'], None)
3968
3969 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3970 def test_npn_protocols(self):
3971 server_protocols = ['http/1.1', 'spdy/2']
3972 protocol_tests = [
3973 (['http/1.1', 'spdy/2'], 'http/1.1'),
3974 (['spdy/2', 'http/1.1'], 'http/1.1'),
3975 (['spdy/2', 'test'], 'spdy/2'),
3976 (['abc', 'def'], 'abc')
3977 ]
3978 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003979 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003980 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003981 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003982 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003983 chatty=True, connectionchatty=True,
3984 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003985 msg = "failed trying %s (s) and %s (c).\n" \
3986 "was expecting %s, but got %%s from the %%s" \
3987 % (str(server_protocols), str(client_protocols),
3988 str(expected))
3989 client_result = stats['client_npn_protocol']
3990 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3991 server_result = stats['server_npn_protocols'][-1] \
3992 if len(stats['server_npn_protocols']) else 'nothing'
3993 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3994
3995 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003996 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003997 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003998 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003999 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004000 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004001 client_context.load_verify_locations(SIGNING_CA)
4002 return server_context, other_context, client_context
4003
4004 def check_common_name(self, stats, name):
4005 cert = stats['peercert']
4006 self.assertIn((('commonName', name),), cert['subject'])
4007
4008 @needs_sni
4009 def test_sni_callback(self):
4010 calls = []
4011 server_context, other_context, client_context = self.sni_contexts()
4012
Christian Heimesa170fa12017-09-15 20:27:30 +02004013 client_context.check_hostname = False
4014
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004015 def servername_cb(ssl_sock, server_name, initial_context):
4016 calls.append((server_name, initial_context))
4017 if server_name is not None:
4018 ssl_sock.context = other_context
4019 server_context.set_servername_callback(servername_cb)
4020
4021 stats = server_params_test(client_context, server_context,
4022 chatty=True,
4023 sni_name='supermessage')
4024 # The hostname was fetched properly, and the certificate was
4025 # changed for the connection.
4026 self.assertEqual(calls, [("supermessage", server_context)])
4027 # CERTFILE4 was selected
4028 self.check_common_name(stats, 'fakehostname')
4029
4030 calls = []
4031 # The callback is called with server_name=None
4032 stats = server_params_test(client_context, server_context,
4033 chatty=True,
4034 sni_name=None)
4035 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004036 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004037
4038 # Check disabling the callback
4039 calls = []
4040 server_context.set_servername_callback(None)
4041
4042 stats = server_params_test(client_context, server_context,
4043 chatty=True,
4044 sni_name='notfunny')
4045 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004046 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004047 self.assertEqual(calls, [])
4048
4049 @needs_sni
4050 def test_sni_callback_alert(self):
4051 # Returning a TLS alert is reflected to the connecting client
4052 server_context, other_context, client_context = self.sni_contexts()
4053
4054 def cb_returning_alert(ssl_sock, server_name, initial_context):
4055 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4056 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004057 with self.assertRaises(ssl.SSLError) as cm:
4058 stats = server_params_test(client_context, server_context,
4059 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004060 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004061 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004062
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 @needs_sni
4064 def test_sni_callback_raising(self):
4065 # Raising fails the connection with a TLS handshake failure alert.
4066 server_context, other_context, client_context = self.sni_contexts()
4067
4068 def cb_raising(ssl_sock, server_name, initial_context):
4069 1/0
4070 server_context.set_servername_callback(cb_raising)
4071
Victor Stinner00253502019-06-03 03:51:43 +02004072 with support.catch_unraisable_exception() as catch:
4073 with self.assertRaises(ssl.SSLError) as cm:
4074 stats = server_params_test(client_context, server_context,
4075 chatty=False,
4076 sni_name='supermessage')
4077
4078 self.assertEqual(cm.exception.reason,
4079 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4080 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004081
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004082 @needs_sni
4083 def test_sni_callback_wrong_return_type(self):
4084 # Returning the wrong return type terminates the TLS connection
4085 # with an internal error alert.
4086 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004087
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004088 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4089 return "foo"
4090 server_context.set_servername_callback(cb_wrong_return_type)
4091
Victor Stinner00253502019-06-03 03:51:43 +02004092 with support.catch_unraisable_exception() as catch:
4093 with self.assertRaises(ssl.SSLError) as cm:
4094 stats = server_params_test(client_context, server_context,
4095 chatty=False,
4096 sni_name='supermessage')
4097
4098
4099 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4100 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004101
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004102 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004103 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004104 client_context.set_ciphers("AES128:AES256")
4105 server_context.set_ciphers("AES256")
4106 expected_algs = [
4107 "AES256", "AES-256",
4108 # TLS 1.3 ciphers are always enabled
4109 "TLS_CHACHA20", "TLS_AES",
4110 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004111
Christian Heimesa170fa12017-09-15 20:27:30 +02004112 stats = server_params_test(client_context, server_context,
4113 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004114 ciphers = stats['server_shared_ciphers'][0]
4115 self.assertGreater(len(ciphers), 0)
4116 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004117 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004118 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004119
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004120 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004121 client_context, server_context, hostname = testing_context()
4122 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004123
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004124 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004125 s = client_context.wrap_socket(socket.socket(),
4126 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004127 s.connect((HOST, server.port))
4128 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004129
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004130 self.assertRaises(ValueError, s.read, 1024)
4131 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004132
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004133 def test_sendfile(self):
4134 TEST_DATA = b"x" * 512
4135 with open(support.TESTFN, 'wb') as f:
4136 f.write(TEST_DATA)
4137 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004138 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004139 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004140 context.load_verify_locations(SIGNING_CA)
4141 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004142 server = ThreadedEchoServer(context=context, chatty=False)
4143 with server:
4144 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004145 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004146 with open(support.TESTFN, 'rb') as file:
4147 s.sendfile(file)
4148 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004149
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004150 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004151 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004152 # TODO: sessions aren't compatible with TLSv1.3 yet
4153 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004154
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004155 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004156 stats = server_params_test(client_context, server_context,
4157 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004158 session = stats['session']
4159 self.assertTrue(session.id)
4160 self.assertGreater(session.time, 0)
4161 self.assertGreater(session.timeout, 0)
4162 self.assertTrue(session.has_ticket)
4163 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4164 self.assertGreater(session.ticket_lifetime_hint, 0)
4165 self.assertFalse(stats['session_reused'])
4166 sess_stat = server_context.session_stats()
4167 self.assertEqual(sess_stat['accept'], 1)
4168 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004169
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004170 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004171 stats = server_params_test(client_context, server_context,
4172 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004173 sess_stat = server_context.session_stats()
4174 self.assertEqual(sess_stat['accept'], 2)
4175 self.assertEqual(sess_stat['hits'], 1)
4176 self.assertTrue(stats['session_reused'])
4177 session2 = stats['session']
4178 self.assertEqual(session2.id, session.id)
4179 self.assertEqual(session2, session)
4180 self.assertIsNot(session2, session)
4181 self.assertGreaterEqual(session2.time, session.time)
4182 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004183
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004184 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004185 stats = server_params_test(client_context, server_context,
4186 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004187 self.assertFalse(stats['session_reused'])
4188 session3 = stats['session']
4189 self.assertNotEqual(session3.id, session.id)
4190 self.assertNotEqual(session3, session)
4191 sess_stat = server_context.session_stats()
4192 self.assertEqual(sess_stat['accept'], 3)
4193 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004194
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004195 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004196 stats = server_params_test(client_context, server_context,
4197 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004198 self.assertTrue(stats['session_reused'])
4199 session4 = stats['session']
4200 self.assertEqual(session4.id, session.id)
4201 self.assertEqual(session4, session)
4202 self.assertGreaterEqual(session4.time, session.time)
4203 self.assertGreaterEqual(session4.timeout, session.timeout)
4204 sess_stat = server_context.session_stats()
4205 self.assertEqual(sess_stat['accept'], 4)
4206 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004207
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004208 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004209 client_context, server_context, hostname = testing_context()
4210 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004211
Christian Heimes05d9fe32018-02-27 08:55:39 +01004212 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004213 client_context.options |= ssl.OP_NO_TLSv1_3
4214 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004215
Christian Heimesa170fa12017-09-15 20:27:30 +02004216 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004217 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004218 with client_context.wrap_socket(socket.socket(),
4219 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004220 # session is None before handshake
4221 self.assertEqual(s.session, None)
4222 self.assertEqual(s.session_reused, None)
4223 s.connect((HOST, server.port))
4224 session = s.session
4225 self.assertTrue(session)
4226 with self.assertRaises(TypeError) as e:
4227 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004228 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004229
Christian Heimesa170fa12017-09-15 20:27:30 +02004230 with client_context.wrap_socket(socket.socket(),
4231 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004232 s.connect((HOST, server.port))
4233 # cannot set session after handshake
4234 with self.assertRaises(ValueError) as e:
4235 s.session = session
4236 self.assertEqual(str(e.exception),
4237 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004238
Christian Heimesa170fa12017-09-15 20:27:30 +02004239 with client_context.wrap_socket(socket.socket(),
4240 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004241 # can set session before handshake and before the
4242 # connection was established
4243 s.session = session
4244 s.connect((HOST, server.port))
4245 self.assertEqual(s.session.id, session.id)
4246 self.assertEqual(s.session, session)
4247 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004248
Christian Heimesa170fa12017-09-15 20:27:30 +02004249 with client_context2.wrap_socket(socket.socket(),
4250 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004251 # cannot re-use session with a different SSLContext
4252 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004253 s.session = session
4254 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004255 self.assertEqual(str(e.exception),
4256 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004257
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004258
Christian Heimes9fb051f2018-09-23 08:32:31 +02004259@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4260class TestPostHandshakeAuth(unittest.TestCase):
4261 def test_pha_setter(self):
4262 protocols = [
4263 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4264 ]
4265 for protocol in protocols:
4266 ctx = ssl.SSLContext(protocol)
4267 self.assertEqual(ctx.post_handshake_auth, False)
4268
4269 ctx.post_handshake_auth = True
4270 self.assertEqual(ctx.post_handshake_auth, True)
4271
4272 ctx.verify_mode = ssl.CERT_REQUIRED
4273 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4274 self.assertEqual(ctx.post_handshake_auth, True)
4275
4276 ctx.post_handshake_auth = False
4277 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4278 self.assertEqual(ctx.post_handshake_auth, False)
4279
4280 ctx.verify_mode = ssl.CERT_OPTIONAL
4281 ctx.post_handshake_auth = True
4282 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4283 self.assertEqual(ctx.post_handshake_auth, True)
4284
4285 def test_pha_required(self):
4286 client_context, server_context, hostname = testing_context()
4287 server_context.post_handshake_auth = True
4288 server_context.verify_mode = ssl.CERT_REQUIRED
4289 client_context.post_handshake_auth = True
4290 client_context.load_cert_chain(SIGNED_CERTFILE)
4291
4292 server = ThreadedEchoServer(context=server_context, chatty=False)
4293 with server:
4294 with client_context.wrap_socket(socket.socket(),
4295 server_hostname=hostname) as s:
4296 s.connect((HOST, server.port))
4297 s.write(b'HASCERT')
4298 self.assertEqual(s.recv(1024), b'FALSE\n')
4299 s.write(b'PHA')
4300 self.assertEqual(s.recv(1024), b'OK\n')
4301 s.write(b'HASCERT')
4302 self.assertEqual(s.recv(1024), b'TRUE\n')
4303 # PHA method just returns true when cert is already available
4304 s.write(b'PHA')
4305 self.assertEqual(s.recv(1024), b'OK\n')
4306 s.write(b'GETCERT')
4307 cert_text = s.recv(4096).decode('us-ascii')
4308 self.assertIn('Python Software Foundation CA', cert_text)
4309
4310 def test_pha_required_nocert(self):
4311 client_context, server_context, hostname = testing_context()
4312 server_context.post_handshake_auth = True
4313 server_context.verify_mode = ssl.CERT_REQUIRED
4314 client_context.post_handshake_auth = True
4315
4316 server = ThreadedEchoServer(context=server_context, chatty=False)
4317 with server:
4318 with client_context.wrap_socket(socket.socket(),
4319 server_hostname=hostname) as s:
4320 s.connect((HOST, server.port))
4321 s.write(b'PHA')
4322 # receive CertificateRequest
4323 self.assertEqual(s.recv(1024), b'OK\n')
4324 # send empty Certificate + Finish
4325 s.write(b'HASCERT')
4326 # receive alert
4327 with self.assertRaisesRegex(
4328 ssl.SSLError,
4329 'tlsv13 alert certificate required'):
4330 s.recv(1024)
4331
4332 def test_pha_optional(self):
4333 if support.verbose:
4334 sys.stdout.write("\n")
4335
4336 client_context, server_context, hostname = testing_context()
4337 server_context.post_handshake_auth = True
4338 server_context.verify_mode = ssl.CERT_REQUIRED
4339 client_context.post_handshake_auth = True
4340 client_context.load_cert_chain(SIGNED_CERTFILE)
4341
4342 # check CERT_OPTIONAL
4343 server_context.verify_mode = ssl.CERT_OPTIONAL
4344 server = ThreadedEchoServer(context=server_context, chatty=False)
4345 with server:
4346 with client_context.wrap_socket(socket.socket(),
4347 server_hostname=hostname) as s:
4348 s.connect((HOST, server.port))
4349 s.write(b'HASCERT')
4350 self.assertEqual(s.recv(1024), b'FALSE\n')
4351 s.write(b'PHA')
4352 self.assertEqual(s.recv(1024), b'OK\n')
4353 s.write(b'HASCERT')
4354 self.assertEqual(s.recv(1024), b'TRUE\n')
4355
4356 def test_pha_optional_nocert(self):
4357 if support.verbose:
4358 sys.stdout.write("\n")
4359
4360 client_context, server_context, hostname = testing_context()
4361 server_context.post_handshake_auth = True
4362 server_context.verify_mode = ssl.CERT_OPTIONAL
4363 client_context.post_handshake_auth = True
4364
4365 server = ThreadedEchoServer(context=server_context, chatty=False)
4366 with server:
4367 with client_context.wrap_socket(socket.socket(),
4368 server_hostname=hostname) as s:
4369 s.connect((HOST, server.port))
4370 s.write(b'HASCERT')
4371 self.assertEqual(s.recv(1024), b'FALSE\n')
4372 s.write(b'PHA')
4373 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004374 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004375 s.write(b'HASCERT')
4376 self.assertEqual(s.recv(1024), b'FALSE\n')
4377
4378 def test_pha_no_pha_client(self):
4379 client_context, server_context, hostname = testing_context()
4380 server_context.post_handshake_auth = True
4381 server_context.verify_mode = ssl.CERT_REQUIRED
4382 client_context.load_cert_chain(SIGNED_CERTFILE)
4383
4384 server = ThreadedEchoServer(context=server_context, chatty=False)
4385 with server:
4386 with client_context.wrap_socket(socket.socket(),
4387 server_hostname=hostname) as s:
4388 s.connect((HOST, server.port))
4389 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4390 s.verify_client_post_handshake()
4391 s.write(b'PHA')
4392 self.assertIn(b'extension not received', s.recv(1024))
4393
4394 def test_pha_no_pha_server(self):
4395 # server doesn't have PHA enabled, cert is requested in handshake
4396 client_context, server_context, hostname = testing_context()
4397 server_context.verify_mode = ssl.CERT_REQUIRED
4398 client_context.post_handshake_auth = True
4399 client_context.load_cert_chain(SIGNED_CERTFILE)
4400
4401 server = ThreadedEchoServer(context=server_context, chatty=False)
4402 with server:
4403 with client_context.wrap_socket(socket.socket(),
4404 server_hostname=hostname) as s:
4405 s.connect((HOST, server.port))
4406 s.write(b'HASCERT')
4407 self.assertEqual(s.recv(1024), b'TRUE\n')
4408 # PHA doesn't fail if there is already a cert
4409 s.write(b'PHA')
4410 self.assertEqual(s.recv(1024), b'OK\n')
4411 s.write(b'HASCERT')
4412 self.assertEqual(s.recv(1024), b'TRUE\n')
4413
4414 def test_pha_not_tls13(self):
4415 # TLS 1.2
4416 client_context, server_context, hostname = testing_context()
4417 server_context.verify_mode = ssl.CERT_REQUIRED
4418 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4419 client_context.post_handshake_auth = True
4420 client_context.load_cert_chain(SIGNED_CERTFILE)
4421
4422 server = ThreadedEchoServer(context=server_context, chatty=False)
4423 with server:
4424 with client_context.wrap_socket(socket.socket(),
4425 server_hostname=hostname) as s:
4426 s.connect((HOST, server.port))
4427 # PHA fails for TLS != 1.3
4428 s.write(b'PHA')
4429 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4430
4431
Christian Heimesc7f70692019-05-31 11:44:05 +02004432HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4433requires_keylog = unittest.skipUnless(
4434 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4435
4436class TestSSLDebug(unittest.TestCase):
4437
4438 def keylog_lines(self, fname=support.TESTFN):
4439 with open(fname) as f:
4440 return len(list(f))
4441
4442 @requires_keylog
4443 def test_keylog_defaults(self):
4444 self.addCleanup(support.unlink, support.TESTFN)
4445 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4446 self.assertEqual(ctx.keylog_filename, None)
4447
4448 self.assertFalse(os.path.isfile(support.TESTFN))
4449 ctx.keylog_filename = support.TESTFN
4450 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4451 self.assertTrue(os.path.isfile(support.TESTFN))
4452 self.assertEqual(self.keylog_lines(), 1)
4453
4454 ctx.keylog_filename = None
4455 self.assertEqual(ctx.keylog_filename, None)
4456
4457 with self.assertRaises((IsADirectoryError, PermissionError)):
4458 # Windows raises PermissionError
4459 ctx.keylog_filename = os.path.dirname(
4460 os.path.abspath(support.TESTFN))
4461
4462 with self.assertRaises(TypeError):
4463 ctx.keylog_filename = 1
4464
4465 @requires_keylog
4466 def test_keylog_filename(self):
4467 self.addCleanup(support.unlink, support.TESTFN)
4468 client_context, server_context, hostname = testing_context()
4469
4470 client_context.keylog_filename = support.TESTFN
4471 server = ThreadedEchoServer(context=server_context, chatty=False)
4472 with server:
4473 with client_context.wrap_socket(socket.socket(),
4474 server_hostname=hostname) as s:
4475 s.connect((HOST, server.port))
4476 # header, 5 lines for TLS 1.3
4477 self.assertEqual(self.keylog_lines(), 6)
4478
4479 client_context.keylog_filename = None
4480 server_context.keylog_filename = support.TESTFN
4481 server = ThreadedEchoServer(context=server_context, chatty=False)
4482 with server:
4483 with client_context.wrap_socket(socket.socket(),
4484 server_hostname=hostname) as s:
4485 s.connect((HOST, server.port))
4486 self.assertGreaterEqual(self.keylog_lines(), 11)
4487
4488 client_context.keylog_filename = support.TESTFN
4489 server_context.keylog_filename = support.TESTFN
4490 server = ThreadedEchoServer(context=server_context, chatty=False)
4491 with server:
4492 with client_context.wrap_socket(socket.socket(),
4493 server_hostname=hostname) as s:
4494 s.connect((HOST, server.port))
4495 self.assertGreaterEqual(self.keylog_lines(), 21)
4496
4497 client_context.keylog_filename = None
4498 server_context.keylog_filename = None
4499
4500 @requires_keylog
4501 @unittest.skipIf(sys.flags.ignore_environment,
4502 "test is not compatible with ignore_environment")
4503 def test_keylog_env(self):
4504 self.addCleanup(support.unlink, support.TESTFN)
4505 with unittest.mock.patch.dict(os.environ):
4506 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4507 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4508
4509 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4510 self.assertEqual(ctx.keylog_filename, None)
4511
4512 ctx = ssl.create_default_context()
4513 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4514
4515 ctx = ssl._create_stdlib_context()
4516 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4517
4518 def test_msg_callback(self):
4519 client_context, server_context, hostname = testing_context()
4520
4521 def msg_cb(conn, direction, version, content_type, msg_type, data):
4522 pass
4523
4524 self.assertIs(client_context._msg_callback, None)
4525 client_context._msg_callback = msg_cb
4526 self.assertIs(client_context._msg_callback, msg_cb)
4527 with self.assertRaises(TypeError):
4528 client_context._msg_callback = object()
4529
4530 def test_msg_callback_tls12(self):
4531 client_context, server_context, hostname = testing_context()
4532 client_context.options |= ssl.OP_NO_TLSv1_3
4533
4534 msg = []
4535
4536 def msg_cb(conn, direction, version, content_type, msg_type, data):
4537 self.assertIsInstance(conn, ssl.SSLSocket)
4538 self.assertIsInstance(data, bytes)
4539 self.assertIn(direction, {'read', 'write'})
4540 msg.append((direction, version, content_type, msg_type))
4541
4542 client_context._msg_callback = msg_cb
4543
4544 server = ThreadedEchoServer(context=server_context, chatty=False)
4545 with server:
4546 with client_context.wrap_socket(socket.socket(),
4547 server_hostname=hostname) as s:
4548 s.connect((HOST, server.port))
4549
Christian Heimese35d1ba2019-06-03 20:40:15 +02004550 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004551 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4552 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004553 msg
4554 )
4555 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004556 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4557 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004558 msg
4559 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004560
4561
Thomas Woutersed03b412007-08-28 21:37:11 +00004562def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004563 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004564 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004565 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004566 'Mac': platform.mac_ver,
4567 'Windows': platform.win32_ver,
4568 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004569 for name, func in plats.items():
4570 plat = func()
4571 if plat and plat[0]:
4572 plat = '%s %r' % (name, plat)
4573 break
4574 else:
4575 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004576 print("test_ssl: testing with %r %r" %
4577 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4578 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004579 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004580 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4581 try:
4582 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4583 except AttributeError:
4584 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004585
Antoine Pitrou152efa22010-05-16 18:19:27 +00004586 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004587 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004588 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004589 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004590 BADCERT, BADKEY, EMPTYCERT]:
4591 if not os.path.exists(filename):
4592 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004593
Martin Panter3840b2a2016-03-27 01:53:46 +00004594 tests = [
4595 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004596 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004597 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004598 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004599
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004600 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004601 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004602
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004603 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004604 try:
4605 support.run_unittest(*tests)
4606 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004607 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004608
4609if __name__ == "__main__":
4610 test_main()