blob: 879d944f327e97d86a7423b15936d90b8edb6915 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Victor Stinner466e18e2019-07-01 19:01:52 +020029from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000030
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020033IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010034IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
35IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010036PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000037
Victor Stinner3ef63442019-02-19 18:06:03 +010038PROTOCOL_TO_TLS_VERSION = {}
39for proto, ver in (
40 ("PROTOCOL_SSLv23", "SSLv3"),
41 ("PROTOCOL_TLSv1", "TLSv1"),
42 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
43):
44 try:
45 proto = getattr(ssl, proto)
46 ver = getattr(ssl.TLSVersion, ver)
47 except AttributeError:
48 continue
49 PROTOCOL_TO_TLS_VERSION[proto] = ver
50
Christian Heimesefff7062013-11-21 03:35:02 +010051def data_file(*name):
52 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000053
Antoine Pitrou81564092010-10-08 23:06:24 +000054# The custom key and certificate files used in test_ssl are generated
55# using Lib/test/make_ssl_certs.py.
56# Other certificates are simply fetched from the Internet servers they
57# are meant to authenticate.
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000060BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000061ONLYCERT = data_file("ssl_cert.pem")
62ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000063BYTES_ONLYCERT = os.fsencode(ONLYCERT)
64BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020065CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
66ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
67KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000068CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000069BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010070CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
71CAFILE_CACERT = data_file("capath", "5ed36f99.0")
72
Christian Heimesbd5c7d22018-01-20 15:16:30 +010073CERTFILE_INFO = {
74 'issuer': ((('countryName', 'XY'),),
75 (('localityName', 'Castle Anthrax'),),
76 (('organizationName', 'Python Software Foundation'),),
77 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020078 'notAfter': 'Aug 26 14:23:15 2028 GMT',
79 'notBefore': 'Aug 29 14:23:15 2018 GMT',
80 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081 'subject': ((('countryName', 'XY'),),
82 (('localityName', 'Castle Anthrax'),),
83 (('organizationName', 'Python Software Foundation'),),
84 (('commonName', 'localhost'),)),
85 'subjectAltName': (('DNS', 'localhost'),),
86 'version': 3
87}
Antoine Pitrou152efa22010-05-16 18:19:27 +000088
Christian Heimes22587792013-11-21 23:56:13 +010089# empty CRL
90CRLFILE = data_file("revocation.crl")
91
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010092# Two keys and certs signed by the same CA (for SNI tests)
93SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020094SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010095
96SIGNED_CERTFILE_INFO = {
97 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
98 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
99 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
100 'issuer': ((('countryName', 'XY'),),
101 (('organizationName', 'Python Software Foundation CA'),),
102 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200103 'notAfter': 'Jul 7 14:23:16 2028 GMT',
104 'notBefore': 'Aug 29 14:23:16 2018 GMT',
105 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100106 'subject': ((('countryName', 'XY'),),
107 (('localityName', 'Castle Anthrax'),),
108 (('organizationName', 'Python Software Foundation'),),
109 (('commonName', 'localhost'),)),
110 'subjectAltName': (('DNS', 'localhost'),),
111 'version': 3
112}
113
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100114SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200115SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100116SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
117SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
118
Martin Panter3840b2a2016-03-27 01:53:46 +0000119# Same certificate as pycacert.pem, but without extra text in file
120SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200121# cert with all kinds of subject alt names
122ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100123IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100124
Martin Panter3d81d932016-01-14 09:36:00 +0000125REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000126
127EMPTYCERT = data_file("nullcert.pem")
128BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000129NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000130BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200131NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200132NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100133TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000134
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200135DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100136BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000137
Christian Heimes358cfd42016-09-10 22:43:48 +0200138# Not defined in all versions of OpenSSL
139OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
140OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
141OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
142OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100143OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200144
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100145
Thomas Woutersed03b412007-08-28 21:37:11 +0000146def handle_error(prefix):
147 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000148 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000149 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000150
Antoine Pitroub5218772010-05-21 09:56:06 +0000151def can_clear_options():
152 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200153 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000154
155def no_sslv2_implies_sslv3_hello():
156 # 0.9.7h or higher
157 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
158
Christian Heimes2427b502013-11-23 11:24:32 +0100159def have_verify_flags():
160 # 0.9.8 or higher
161 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
162
Christian Heimesb7b92252018-02-25 09:49:31 +0100163def _have_secp_curves():
164 if not ssl.HAS_ECDH:
165 return False
166 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
167 try:
168 ctx.set_ecdh_curve("secp384r1")
169 except ValueError:
170 return False
171 else:
172 return True
173
174
175HAVE_SECP_CURVES = _have_secp_curves()
176
177
Antoine Pitrouc695c952014-04-28 20:57:36 +0200178def utc_offset(): #NOTE: ignore issues like #1647654
179 # local time = utc time + utc offset
180 if time.daylight and time.localtime().tm_isdst > 0:
181 return -time.altzone # seconds
182 return -time.timezone
183
Christian Heimes9424bb42013-06-17 15:32:57 +0200184def asn1time(cert_time):
185 # Some versions of OpenSSL ignore seconds, see #18207
186 # 0.9.8.i
187 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
188 fmt = "%b %d %H:%M:%S %Y GMT"
189 dt = datetime.datetime.strptime(cert_time, fmt)
190 dt = dt.replace(second=0)
191 cert_time = dt.strftime(fmt)
192 # %d adds leading zero but ASN1_TIME_print() uses leading space
193 if cert_time[4] == "0":
194 cert_time = cert_time[:4] + " " + cert_time[5:]
195
196 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000197
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100198needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
199
Antoine Pitrou23df4832010-08-04 17:14:06 +0000200
Christian Heimesd0486372016-09-10 23:23:33 +0200201def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
202 cert_reqs=ssl.CERT_NONE, ca_certs=None,
203 ciphers=None, certfile=None, keyfile=None,
204 **kwargs):
205 context = ssl.SSLContext(ssl_version)
206 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200207 if cert_reqs == ssl.CERT_NONE:
208 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200209 context.verify_mode = cert_reqs
210 if ca_certs is not None:
211 context.load_verify_locations(ca_certs)
212 if certfile is not None or keyfile is not None:
213 context.load_cert_chain(certfile, keyfile)
214 if ciphers is not None:
215 context.set_ciphers(ciphers)
216 return context.wrap_socket(sock, **kwargs)
217
Christian Heimesa170fa12017-09-15 20:27:30 +0200218
219def testing_context(server_cert=SIGNED_CERTFILE):
220 """Create context
221
222 client_context, server_context, hostname = testing_context()
223 """
224 if server_cert == SIGNED_CERTFILE:
225 hostname = SIGNED_CERTFILE_HOSTNAME
226 elif server_cert == SIGNED_CERTFILE2:
227 hostname = SIGNED_CERTFILE2_HOSTNAME
228 else:
229 raise ValueError(server_cert)
230
231 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
232 client_context.load_verify_locations(SIGNING_CA)
233
234 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
235 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200236 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200237
238 return client_context, server_context, hostname
239
240
Antoine Pitrou152efa22010-05-16 18:19:27 +0000241class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000242
Antoine Pitrou480a1242010-04-28 21:37:09 +0000243 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000244 ssl.CERT_NONE
245 ssl.CERT_OPTIONAL
246 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100247 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100248 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100249 if ssl.HAS_ECDH:
250 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100251 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
252 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000253 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100254 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700255 ssl.OP_NO_SSLv2
256 ssl.OP_NO_SSLv3
257 ssl.OP_NO_TLSv1
258 ssl.OP_NO_TLSv1_3
259 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
260 ssl.OP_NO_TLSv1_1
261 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200262 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000263
Christian Heimes9d50ab52018-02-27 10:17:30 +0100264 def test_private_init(self):
265 with self.assertRaisesRegex(TypeError, "public constructor"):
266 with socket.socket() as s:
267 ssl.SSLSocket(s)
268
Antoine Pitrou172f0252014-04-18 20:33:08 +0200269 def test_str_for_enums(self):
270 # Make sure that the PROTOCOL_* constants have enum-like string
271 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200272 proto = ssl.PROTOCOL_TLS
273 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200274 ctx = ssl.SSLContext(proto)
275 self.assertIs(ctx.protocol, proto)
276
Antoine Pitrou480a1242010-04-28 21:37:09 +0000277 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000279 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 sys.stdout.write("\n RAND_status is %d (%s)\n"
281 % (v, (v and "sufficient randomness") or
282 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200283
284 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
285 self.assertEqual(len(data), 16)
286 self.assertEqual(is_cryptographic, v == 1)
287 if v:
288 data = ssl.RAND_bytes(16)
289 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200290 else:
291 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200292
Victor Stinner1e81a392013-12-19 16:47:04 +0100293 # negative num is invalid
294 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
295 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
296
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100297 if hasattr(ssl, 'RAND_egd'):
298 self.assertRaises(TypeError, ssl.RAND_egd, 1)
299 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000300 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200301 ssl.RAND_add(b"this is a random bytes object", 75.0)
302 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000303
Christian Heimesf77b4b22013-08-21 13:26:05 +0200304 @unittest.skipUnless(os.name == 'posix', 'requires posix')
305 def test_random_fork(self):
306 status = ssl.RAND_status()
307 if not status:
308 self.fail("OpenSSL's PRNG has insufficient randomness")
309
310 rfd, wfd = os.pipe()
311 pid = os.fork()
312 if pid == 0:
313 try:
314 os.close(rfd)
315 child_random = ssl.RAND_pseudo_bytes(16)[0]
316 self.assertEqual(len(child_random), 16)
317 os.write(wfd, child_random)
318 os.close(wfd)
319 except BaseException:
320 os._exit(1)
321 else:
322 os._exit(0)
323 else:
324 os.close(wfd)
325 self.addCleanup(os.close, rfd)
326 _, status = os.waitpid(pid, 0)
327 self.assertEqual(status, 0)
328
329 child_random = os.read(rfd, 16)
330 self.assertEqual(len(child_random), 16)
331 parent_random = ssl.RAND_pseudo_bytes(16)[0]
332 self.assertEqual(len(parent_random), 16)
333
334 self.assertNotEqual(child_random, parent_random)
335
Christian Heimese6dac002018-08-30 07:25:49 +0200336 maxDiff = None
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimesa37f5242019-01-15 23:47:42 +0100367 def test_parse_cert_CVE_2019_5010(self):
368 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 self.assertEqual(
372 p,
373 {
374 'issuer': (
375 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
376 'notAfter': 'Jun 14 18:00:58 2028 GMT',
377 'notBefore': 'Jun 18 18:00:58 2018 GMT',
378 'serialNumber': '02',
379 'subject': ((('countryName', 'UK'),),
380 (('commonName',
381 'codenomicon-vm-2.test.lal.cisco.com'),)),
382 'subjectAltName': (
383 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
384 'version': 3
385 }
386 )
387
Christian Heimes824f7f32013-08-17 00:54:47 +0200388 def test_parse_cert_CVE_2013_4238(self):
389 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
390 if support.verbose:
391 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
392 subject = ((('countryName', 'US'),),
393 (('stateOrProvinceName', 'Oregon'),),
394 (('localityName', 'Beaverton'),),
395 (('organizationName', 'Python Software Foundation'),),
396 (('organizationalUnitName', 'Python Core Development'),),
397 (('commonName', 'null.python.org\x00example.org'),),
398 (('emailAddress', 'python-dev@python.org'),))
399 self.assertEqual(p['subject'], subject)
400 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200401 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
402 san = (('DNS', 'altnull.python.org\x00example.com'),
403 ('email', 'null@python.org\x00user@example.org'),
404 ('URI', 'http://null.python.org\x00http://example.org'),
405 ('IP Address', '192.0.2.1'),
406 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
407 else:
408 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
409 san = (('DNS', 'altnull.python.org\x00example.com'),
410 ('email', 'null@python.org\x00user@example.org'),
411 ('URI', 'http://null.python.org\x00http://example.org'),
412 ('IP Address', '192.0.2.1'),
413 ('IP Address', '<invalid>'))
414
415 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200416
Christian Heimes1c03abd2016-09-06 23:25:35 +0200417 def test_parse_all_sans(self):
418 p = ssl._ssl._test_decode_cert(ALLSANFILE)
419 self.assertEqual(p['subjectAltName'],
420 (
421 ('DNS', 'allsans'),
422 ('othername', '<unsupported>'),
423 ('othername', '<unsupported>'),
424 ('email', 'user@example.org'),
425 ('DNS', 'www.example.org'),
426 ('DirName',
427 ((('countryName', 'XY'),),
428 (('localityName', 'Castle Anthrax'),),
429 (('organizationName', 'Python Software Foundation'),),
430 (('commonName', 'dirname example'),))),
431 ('URI', 'https://www.python.org/'),
432 ('IP Address', '127.0.0.1'),
433 ('IP Address', '0:0:0:0:0:0:0:1\n'),
434 ('Registered ID', '1.2.3.4.5')
435 )
436 )
437
Antoine Pitrou480a1242010-04-28 21:37:09 +0000438 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000439 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000440 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000441 d1 = ssl.PEM_cert_to_DER_cert(pem)
442 p2 = ssl.DER_cert_to_PEM_cert(d1)
443 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000444 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000445 if not p2.startswith(ssl.PEM_HEADER + '\n'):
446 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
447 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
448 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000449
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 def test_openssl_version(self):
451 n = ssl.OPENSSL_VERSION_NUMBER
452 t = ssl.OPENSSL_VERSION_INFO
453 s = ssl.OPENSSL_VERSION
454 self.assertIsInstance(n, int)
455 self.assertIsInstance(t, tuple)
456 self.assertIsInstance(s, str)
457 # Some sanity checks follow
458 # >= 0.9
459 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400460 # < 3.0
461 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000462 major, minor, fix, patch, status = t
463 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400464 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000465 self.assertGreaterEqual(minor, 0)
466 self.assertLess(minor, 256)
467 self.assertGreaterEqual(fix, 0)
468 self.assertLess(fix, 256)
469 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100470 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000471 self.assertGreaterEqual(status, 0)
472 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400473 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200474 if IS_LIBRESSL:
475 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100476 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400477 else:
478 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100479 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000480
Antoine Pitrou9d543662010-04-23 23:10:32 +0000481 @support.cpython_only
482 def test_refcycle(self):
483 # Issue #7943: an SSL object doesn't create reference cycles with
484 # itself.
485 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200486 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000487 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100488 with support.check_warnings(("", ResourceWarning)):
489 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100490 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000491
Antoine Pitroua468adc2010-09-14 14:43:44 +0000492 def test_wrapped_unconnected(self):
493 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200494 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000495 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200496 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100497 self.assertRaises(OSError, ss.recv, 1)
498 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
499 self.assertRaises(OSError, ss.recvfrom, 1)
500 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
501 self.assertRaises(OSError, ss.send, b'x')
502 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200503 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100504 self.assertRaises(NotImplementedError, ss.sendmsg,
505 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200506 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
507 self.assertRaises(NotImplementedError, ss.recvmsg_into,
508 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000509
Antoine Pitrou40f08742010-04-24 22:04:40 +0000510 def test_timeout(self):
511 # Issue #8524: when creating an SSL socket, the timeout of the
512 # original socket should be retained.
513 for timeout in (None, 0.0, 5.0):
514 s = socket.socket(socket.AF_INET)
515 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200516 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100517 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000518
Christian Heimesd0486372016-09-10 23:23:33 +0200519 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000520 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000521 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000522 "certfile must be specified",
523 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000524 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000525 "certfile must be specified for server-side operations",
526 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000527 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000528 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200529 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100530 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
531 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200532 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200533 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000534 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000535 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000536 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200537 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000538 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000539 ssl.wrap_socket(sock,
540 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000541 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200542 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000543 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000544 ssl.wrap_socket(sock,
545 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000546 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000547
Martin Panter3464ea22016-02-01 21:58:11 +0000548 def bad_cert_test(self, certfile):
549 """Check that trying to use the given client certificate fails"""
550 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
551 certfile)
552 sock = socket.socket()
553 self.addCleanup(sock.close)
554 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200555 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200556 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000557
558 def test_empty_cert(self):
559 """Wrapping with an empty cert file"""
560 self.bad_cert_test("nullcert.pem")
561
562 def test_malformed_cert(self):
563 """Wrapping with a badly formatted certificate (syntax error)"""
564 self.bad_cert_test("badcert.pem")
565
566 def test_malformed_key(self):
567 """Wrapping with a badly formatted key (syntax error)"""
568 self.bad_cert_test("badkey.pem")
569
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000570 def test_match_hostname(self):
571 def ok(cert, hostname):
572 ssl.match_hostname(cert, hostname)
573 def fail(cert, hostname):
574 self.assertRaises(ssl.CertificateError,
575 ssl.match_hostname, cert, hostname)
576
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100577 # -- Hostname matching --
578
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000579 cert = {'subject': ((('commonName', 'example.com'),),)}
580 ok(cert, 'example.com')
581 ok(cert, 'ExAmple.cOm')
582 fail(cert, 'www.example.com')
583 fail(cert, '.example.com')
584 fail(cert, 'example.org')
585 fail(cert, 'exampleXcom')
586
587 cert = {'subject': ((('commonName', '*.a.com'),),)}
588 ok(cert, 'foo.a.com')
589 fail(cert, 'bar.foo.a.com')
590 fail(cert, 'a.com')
591 fail(cert, 'Xa.com')
592 fail(cert, '.a.com')
593
Mandeep Singhede2ac92017-11-27 04:01:27 +0530594 # only match wildcards when they are the only thing
595 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000596 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530597 fail(cert, 'foo.com')
598 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 fail(cert, 'bar.com')
600 fail(cert, 'foo.a.com')
601 fail(cert, 'bar.foo.com')
602
Christian Heimes824f7f32013-08-17 00:54:47 +0200603 # NULL bytes are bad, CVE-2013-4073
604 cert = {'subject': ((('commonName',
605 'null.python.org\x00example.org'),),)}
606 ok(cert, 'null.python.org\x00example.org') # or raise an error?
607 fail(cert, 'example.org')
608 fail(cert, 'null.python.org')
609
Georg Brandl72c98d32013-10-27 07:16:53 +0100610 # error cases with wildcards
611 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
612 fail(cert, 'bar.foo.a.com')
613 fail(cert, 'a.com')
614 fail(cert, 'Xa.com')
615 fail(cert, '.a.com')
616
617 cert = {'subject': ((('commonName', 'a.*.com'),),)}
618 fail(cert, 'a.foo.com')
619 fail(cert, 'a..com')
620 fail(cert, 'a.com')
621
622 # wildcard doesn't match IDNA prefix 'xn--'
623 idna = 'püthon.python.org'.encode("idna").decode("ascii")
624 cert = {'subject': ((('commonName', idna),),)}
625 ok(cert, idna)
626 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
627 fail(cert, idna)
628 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
629 fail(cert, idna)
630
631 # wildcard in first fragment and IDNA A-labels in sequent fragments
632 # are supported.
633 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
634 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530635 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
636 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100637 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
638 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
639
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000640 # Slightly fake real-world example
641 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
642 'subject': ((('commonName', 'linuxfrz.org'),),),
643 'subjectAltName': (('DNS', 'linuxfr.org'),
644 ('DNS', 'linuxfr.com'),
645 ('othername', '<unsupported>'))}
646 ok(cert, 'linuxfr.org')
647 ok(cert, 'linuxfr.com')
648 # Not a "DNS" entry
649 fail(cert, '<unsupported>')
650 # When there is a subjectAltName, commonName isn't used
651 fail(cert, 'linuxfrz.org')
652
653 # A pristine real-world example
654 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
655 'subject': ((('countryName', 'US'),),
656 (('stateOrProvinceName', 'California'),),
657 (('localityName', 'Mountain View'),),
658 (('organizationName', 'Google Inc'),),
659 (('commonName', 'mail.google.com'),))}
660 ok(cert, 'mail.google.com')
661 fail(cert, 'gmail.com')
662 # Only commonName is considered
663 fail(cert, 'California')
664
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100665 # -- IPv4 matching --
666 cert = {'subject': ((('commonName', 'example.com'),),),
667 'subjectAltName': (('DNS', 'example.com'),
668 ('IP Address', '10.11.12.13'),
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700669 ('IP Address', '14.15.16.17'),
670 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100671 ok(cert, '10.11.12.13')
672 ok(cert, '14.15.16.17')
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700673 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
674 fail(cert, '127.1')
675 fail(cert, '14.15.16.17 ')
676 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100677 fail(cert, '14.15.16.18')
678 fail(cert, 'example.net')
679
680 # -- IPv6 matching --
Miss Islington (bot)c2684c62019-06-30 08:42:22 -0700681 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100682 cert = {'subject': ((('commonName', 'example.com'),),),
683 'subjectAltName': (
684 ('DNS', 'example.com'),
685 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
686 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
687 ok(cert, '2001::cafe')
688 ok(cert, '2003::baba')
Miss Islington (bot)3cba3d32019-07-02 14:06:18 -0700689 fail(cert, '2003::baba ')
690 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100691 fail(cert, '2003::bebe')
692 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100693
694 # -- Miscellaneous --
695
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000696 # Neither commonName nor subjectAltName
697 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
698 'subject': ((('countryName', 'US'),),
699 (('stateOrProvinceName', 'California'),),
700 (('localityName', 'Mountain View'),),
701 (('organizationName', 'Google Inc'),))}
702 fail(cert, 'mail.google.com')
703
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200704 # No DNS entry in subjectAltName but a commonName
705 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
706 'subject': ((('countryName', 'US'),),
707 (('stateOrProvinceName', 'California'),),
708 (('localityName', 'Mountain View'),),
709 (('commonName', 'mail.google.com'),)),
710 'subjectAltName': (('othername', 'blabla'), )}
711 ok(cert, 'mail.google.com')
712
713 # No DNS entry subjectAltName and no commonName
714 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
715 'subject': ((('countryName', 'US'),),
716 (('stateOrProvinceName', 'California'),),
717 (('localityName', 'Mountain View'),),
718 (('organizationName', 'Google Inc'),)),
719 'subjectAltName': (('othername', 'blabla'),)}
720 fail(cert, 'google.com')
721
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000722 # Empty cert / no cert
723 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
724 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
725
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200726 # Issue #17980: avoid denials of service by refusing more than one
727 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100728 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
729 with self.assertRaisesRegex(
730 ssl.CertificateError,
731 "partial wildcards in leftmost label are not supported"):
732 ssl.match_hostname(cert, 'axxb.example.com')
733
734 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
735 with self.assertRaisesRegex(
736 ssl.CertificateError,
737 "wildcard can only be present in the leftmost label"):
738 ssl.match_hostname(cert, 'www.sub.example.com')
739
740 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
741 with self.assertRaisesRegex(
742 ssl.CertificateError,
743 "too many wildcards"):
744 ssl.match_hostname(cert, 'axxbxxc.example.com')
745
746 cert = {'subject': ((('commonName', '*'),),)}
747 with self.assertRaisesRegex(
748 ssl.CertificateError,
749 "sole wildcard without additional labels are not support"):
750 ssl.match_hostname(cert, 'host')
751
752 cert = {'subject': ((('commonName', '*.com'),),)}
753 with self.assertRaisesRegex(
754 ssl.CertificateError,
755 r"hostname 'com' doesn't match '\*.com'"):
756 ssl.match_hostname(cert, 'com')
757
758 # extra checks for _inet_paton()
759 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
760 with self.assertRaises(ValueError):
761 ssl._inet_paton(invalid)
762 for ipaddr in ['127.0.0.1', '192.168.0.1']:
763 self.assertTrue(ssl._inet_paton(ipaddr))
Miss Islington (bot)c2684c62019-06-30 08:42:22 -0700764 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100765 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
766 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200767
Antoine Pitroud5323212010-10-22 18:19:07 +0000768 def test_server_side(self):
769 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200770 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000771 with socket.socket() as sock:
772 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
773 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000774
Antoine Pitroud6494802011-07-21 01:11:30 +0200775 def test_unknown_channel_binding(self):
776 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200777 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200778 c = socket.socket(socket.AF_INET)
779 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200780 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100781 with self.assertRaises(ValueError):
782 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200783 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200784
785 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
786 "'tls-unique' channel binding not available")
787 def test_tls_unique_channel_binding(self):
788 # unconnected should return None for known type
789 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200790 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100791 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200792 # the same for server-side
793 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200794 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100795 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200796
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600797 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200798 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600799 r = repr(ss)
800 with self.assertWarns(ResourceWarning) as cm:
801 ss = None
802 support.gc_collect()
803 self.assertIn(r, str(cm.warning.args[0]))
804
Christian Heimes6d7ad132013-06-09 18:02:55 +0200805 def test_get_default_verify_paths(self):
806 paths = ssl.get_default_verify_paths()
807 self.assertEqual(len(paths), 6)
808 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
809
810 with support.EnvironmentVarGuard() as env:
811 env["SSL_CERT_DIR"] = CAPATH
812 env["SSL_CERT_FILE"] = CERTFILE
813 paths = ssl.get_default_verify_paths()
814 self.assertEqual(paths.cafile, CERTFILE)
815 self.assertEqual(paths.capath, CAPATH)
816
Christian Heimes44109d72013-11-22 01:51:30 +0100817 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
818 def test_enum_certificates(self):
819 self.assertTrue(ssl.enum_certificates("CA"))
820 self.assertTrue(ssl.enum_certificates("ROOT"))
821
822 self.assertRaises(TypeError, ssl.enum_certificates)
823 self.assertRaises(WindowsError, ssl.enum_certificates, "")
824
Christian Heimesc2d65e12013-11-22 16:13:55 +0100825 trust_oids = set()
826 for storename in ("CA", "ROOT"):
827 store = ssl.enum_certificates(storename)
828 self.assertIsInstance(store, list)
829 for element in store:
830 self.assertIsInstance(element, tuple)
831 self.assertEqual(len(element), 3)
832 cert, enc, trust = element
833 self.assertIsInstance(cert, bytes)
834 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
835 self.assertIsInstance(trust, (set, bool))
836 if isinstance(trust, set):
837 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100838
839 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100840 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200841
Christian Heimes46bebee2013-06-09 19:03:31 +0200842 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100843 def test_enum_crls(self):
844 self.assertTrue(ssl.enum_crls("CA"))
845 self.assertRaises(TypeError, ssl.enum_crls)
846 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200847
Christian Heimes44109d72013-11-22 01:51:30 +0100848 crls = ssl.enum_crls("CA")
849 self.assertIsInstance(crls, list)
850 for element in crls:
851 self.assertIsInstance(element, tuple)
852 self.assertEqual(len(element), 2)
853 self.assertIsInstance(element[0], bytes)
854 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200855
Christian Heimes46bebee2013-06-09 19:03:31 +0200856
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100857 def test_asn1object(self):
858 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
859 '1.3.6.1.5.5.7.3.1')
860
861 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
862 self.assertEqual(val, expected)
863 self.assertEqual(val.nid, 129)
864 self.assertEqual(val.shortname, 'serverAuth')
865 self.assertEqual(val.longname, 'TLS Web Server Authentication')
866 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
867 self.assertIsInstance(val, ssl._ASN1Object)
868 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
869
870 val = ssl._ASN1Object.fromnid(129)
871 self.assertEqual(val, expected)
872 self.assertIsInstance(val, ssl._ASN1Object)
873 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100874 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
875 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100876 for i in range(1000):
877 try:
878 obj = ssl._ASN1Object.fromnid(i)
879 except ValueError:
880 pass
881 else:
882 self.assertIsInstance(obj.nid, int)
883 self.assertIsInstance(obj.shortname, str)
884 self.assertIsInstance(obj.longname, str)
885 self.assertIsInstance(obj.oid, (str, type(None)))
886
887 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
888 self.assertEqual(val, expected)
889 self.assertIsInstance(val, ssl._ASN1Object)
890 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
891 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
892 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100893 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
894 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100895
Christian Heimes72d28502013-11-23 13:56:58 +0100896 def test_purpose_enum(self):
897 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
898 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
899 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
900 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
901 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
902 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
903 '1.3.6.1.5.5.7.3.1')
904
905 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
906 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
907 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
908 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
909 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
910 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
911 '1.3.6.1.5.5.7.3.2')
912
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100913 def test_unsupported_dtls(self):
914 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
915 self.addCleanup(s.close)
916 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200917 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100918 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200919 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100920 with self.assertRaises(NotImplementedError) as cx:
921 ctx.wrap_socket(s)
922 self.assertEqual(str(cx.exception), "only stream sockets are supported")
923
Antoine Pitrouc695c952014-04-28 20:57:36 +0200924 def cert_time_ok(self, timestring, timestamp):
925 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
926
927 def cert_time_fail(self, timestring):
928 with self.assertRaises(ValueError):
929 ssl.cert_time_to_seconds(timestring)
930
931 @unittest.skipUnless(utc_offset(),
932 'local time needs to be different from UTC')
933 def test_cert_time_to_seconds_timezone(self):
934 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
935 # results if local timezone is not UTC
936 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
937 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
938
939 def test_cert_time_to_seconds(self):
940 timestring = "Jan 5 09:34:43 2018 GMT"
941 ts = 1515144883.0
942 self.cert_time_ok(timestring, ts)
943 # accept keyword parameter, assert its name
944 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
945 # accept both %e and %d (space or zero generated by strftime)
946 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
947 # case-insensitive
948 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
949 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
950 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
951 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
952 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
953 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
954 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
955 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
956
957 newyear_ts = 1230768000.0
958 # leap seconds
959 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
960 # same timestamp
961 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
962
963 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
964 # allow 60th second (even if it is not a leap second)
965 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
966 # allow 2nd leap second for compatibility with time.strptime()
967 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
968 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
969
Mike53f7a7c2017-12-14 14:04:53 +0300970 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200971 # 99991231235959Z (rfc 5280)
972 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
973
974 @support.run_with_locale('LC_ALL', '')
975 def test_cert_time_to_seconds_locale(self):
976 # `cert_time_to_seconds()` should be locale independent
977
978 def local_february_name():
979 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
980
981 if local_february_name().lower() == 'feb':
982 self.skipTest("locale-specific month name needs to be "
983 "different from C locale")
984
985 # locale-independent
986 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
987 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
988
Martin Panter3840b2a2016-03-27 01:53:46 +0000989 def test_connect_ex_error(self):
990 server = socket.socket(socket.AF_INET)
991 self.addCleanup(server.close)
992 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200993 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000994 cert_reqs=ssl.CERT_REQUIRED)
995 self.addCleanup(s.close)
996 rc = s.connect_ex((HOST, port))
997 # Issue #19919: Windows machines or VMs hosted on Windows
998 # machines sometimes return EWOULDBLOCK.
999 errors = (
1000 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1001 errno.EWOULDBLOCK,
1002 )
1003 self.assertIn(rc, errors)
1004
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001005
Antoine Pitrou152efa22010-05-16 18:19:27 +00001006class ContextTests(unittest.TestCase):
1007
1008 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001009 for protocol in PROTOCOLS:
1010 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001011 ctx = ssl.SSLContext()
1012 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001013 self.assertRaises(ValueError, ssl.SSLContext, -1)
1014 self.assertRaises(ValueError, ssl.SSLContext, 42)
1015
1016 def test_protocol(self):
1017 for proto in PROTOCOLS:
1018 ctx = ssl.SSLContext(proto)
1019 self.assertEqual(ctx.protocol, proto)
1020
1021 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001022 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001023 ctx.set_ciphers("ALL")
1024 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001025 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001026 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001027
Christian Heimes892d66e2018-01-29 14:10:18 +01001028 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1029 "Test applies only to Python default ciphers")
1030 def test_python_ciphers(self):
1031 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1032 ciphers = ctx.get_ciphers()
1033 for suite in ciphers:
1034 name = suite['name']
1035 self.assertNotIn("PSK", name)
1036 self.assertNotIn("SRP", name)
1037 self.assertNotIn("MD5", name)
1038 self.assertNotIn("RC4", name)
1039 self.assertNotIn("3DES", name)
1040
Christian Heimes25bfcd52016-09-06 00:04:45 +02001041 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1042 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001044 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001045 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001046 self.assertIn('AES256-GCM-SHA384', names)
1047 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001048
Antoine Pitroub5218772010-05-21 09:56:06 +00001049 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001051 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001052 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001053 # SSLContext also enables these by default
1054 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001055 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1056 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001057 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001058 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001059 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001060 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001061 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1062 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001063 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001064 # Ubuntu has OP_NO_SSLv3 forced on by default
1065 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001066 else:
1067 with self.assertRaises(ValueError):
1068 ctx.options = 0
1069
Christian Heimesa170fa12017-09-15 20:27:30 +02001070 def test_verify_mode_protocol(self):
1071 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001072 # Default value
1073 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1074 ctx.verify_mode = ssl.CERT_OPTIONAL
1075 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1076 ctx.verify_mode = ssl.CERT_REQUIRED
1077 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1078 ctx.verify_mode = ssl.CERT_NONE
1079 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1080 with self.assertRaises(TypeError):
1081 ctx.verify_mode = None
1082 with self.assertRaises(ValueError):
1083 ctx.verify_mode = 42
1084
Christian Heimesa170fa12017-09-15 20:27:30 +02001085 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1086 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1087 self.assertFalse(ctx.check_hostname)
1088
1089 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1090 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1091 self.assertTrue(ctx.check_hostname)
1092
Christian Heimes61d478c2018-01-27 15:51:38 +01001093 def test_hostname_checks_common_name(self):
1094 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1095 self.assertTrue(ctx.hostname_checks_common_name)
1096 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1097 ctx.hostname_checks_common_name = True
1098 self.assertTrue(ctx.hostname_checks_common_name)
1099 ctx.hostname_checks_common_name = False
1100 self.assertFalse(ctx.hostname_checks_common_name)
1101 ctx.hostname_checks_common_name = True
1102 self.assertTrue(ctx.hostname_checks_common_name)
1103 else:
1104 with self.assertRaises(AttributeError):
1105 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001106
Christian Heimes698dde12018-02-27 11:54:43 +01001107 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1108 "required OpenSSL 1.1.0g")
1109 def test_min_max_version(self):
1110 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001111 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1112 # Fedora override the setting to TLS 1.0.
1113 self.assertIn(
1114 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001115 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1116 # Fedora 29 uses TLS 1.0 by default
1117 ssl.TLSVersion.TLSv1,
1118 # RHEL 8 uses TLS 1.2 by default
1119 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001120 )
1121 self.assertEqual(
1122 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1123 )
1124
1125 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1126 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1127 self.assertEqual(
1128 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1129 )
1130 self.assertEqual(
1131 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1132 )
1133
1134 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1135 ctx.maximum_version = ssl.TLSVersion.TLSv1
1136 self.assertEqual(
1137 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1138 )
1139 self.assertEqual(
1140 ctx.maximum_version, ssl.TLSVersion.TLSv1
1141 )
1142
1143 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1144 self.assertEqual(
1145 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1146 )
1147
1148 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1149 self.assertIn(
1150 ctx.maximum_version,
1151 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1152 )
1153
1154 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1155 self.assertIn(
1156 ctx.minimum_version,
1157 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1158 )
1159
1160 with self.assertRaises(ValueError):
1161 ctx.minimum_version = 42
1162
1163 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1164
1165 self.assertEqual(
1166 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1167 )
1168 self.assertEqual(
1169 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1170 )
1171 with self.assertRaises(ValueError):
1172 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1173 with self.assertRaises(ValueError):
1174 ctx.maximum_version = ssl.TLSVersion.TLSv1
1175
1176
Christian Heimes2427b502013-11-23 11:24:32 +01001177 @unittest.skipUnless(have_verify_flags(),
1178 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001179 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001180 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001181 # default value
1182 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1183 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001184 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1185 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1186 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1187 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1188 ctx.verify_flags = ssl.VERIFY_DEFAULT
1189 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1190 # supports any value
1191 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1192 self.assertEqual(ctx.verify_flags,
1193 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1194 with self.assertRaises(TypeError):
1195 ctx.verify_flags = None
1196
Antoine Pitrou152efa22010-05-16 18:19:27 +00001197 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001198 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001199 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001200 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001201 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1202 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001203 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001204 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001205 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001206 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001207 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001208 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001209 ctx.load_cert_chain(EMPTYCERT)
1210 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001211 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001212 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1213 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1214 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001215 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001216 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001217 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001218 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001219 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001220 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1221 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001222 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001223 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001224 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001225 # Password protected key and cert
1226 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1227 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1228 ctx.load_cert_chain(CERTFILE_PROTECTED,
1229 password=bytearray(KEY_PASSWORD.encode()))
1230 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1231 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1232 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1233 bytearray(KEY_PASSWORD.encode()))
1234 with self.assertRaisesRegex(TypeError, "should be a string"):
1235 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1236 with self.assertRaises(ssl.SSLError):
1237 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1238 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1239 # openssl has a fixed limit on the password buffer.
1240 # PEM_BUFSIZE is generally set to 1kb.
1241 # Return a string larger than this.
1242 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1243 # Password callback
1244 def getpass_unicode():
1245 return KEY_PASSWORD
1246 def getpass_bytes():
1247 return KEY_PASSWORD.encode()
1248 def getpass_bytearray():
1249 return bytearray(KEY_PASSWORD.encode())
1250 def getpass_badpass():
1251 return "badpass"
1252 def getpass_huge():
1253 return b'a' * (1024 * 1024)
1254 def getpass_bad_type():
1255 return 9
1256 def getpass_exception():
1257 raise Exception('getpass error')
1258 class GetPassCallable:
1259 def __call__(self):
1260 return KEY_PASSWORD
1261 def getpass(self):
1262 return KEY_PASSWORD
1263 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1264 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1265 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1266 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1267 ctx.load_cert_chain(CERTFILE_PROTECTED,
1268 password=GetPassCallable().getpass)
1269 with self.assertRaises(ssl.SSLError):
1270 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1271 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1272 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1273 with self.assertRaisesRegex(TypeError, "must return a string"):
1274 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1275 with self.assertRaisesRegex(Exception, "getpass error"):
1276 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1277 # Make sure the password function isn't called if it isn't needed
1278 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001279
1280 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001281 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001282 ctx.load_verify_locations(CERTFILE)
1283 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1284 ctx.load_verify_locations(BYTES_CERTFILE)
1285 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1286 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001287 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001288 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001289 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001290 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001291 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001292 ctx.load_verify_locations(BADCERT)
1293 ctx.load_verify_locations(CERTFILE, CAPATH)
1294 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1295
Victor Stinner80f75e62011-01-29 11:31:20 +00001296 # Issue #10989: crash if the second argument type is invalid
1297 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1298
Christian Heimesefff7062013-11-21 03:35:02 +01001299 def test_load_verify_cadata(self):
1300 # test cadata
1301 with open(CAFILE_CACERT) as f:
1302 cacert_pem = f.read()
1303 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1304 with open(CAFILE_NEURONIO) as f:
1305 neuronio_pem = f.read()
1306 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1307
1308 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1311 ctx.load_verify_locations(cadata=cacert_pem)
1312 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1313 ctx.load_verify_locations(cadata=neuronio_pem)
1314 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1315 # cert already in hash table
1316 ctx.load_verify_locations(cadata=neuronio_pem)
1317 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1318
1319 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001320 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001321 combined = "\n".join((cacert_pem, neuronio_pem))
1322 ctx.load_verify_locations(cadata=combined)
1323 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1324
1325 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001326 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001327 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1328 neuronio_pem, "tail"]
1329 ctx.load_verify_locations(cadata="\n".join(combined))
1330 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1331
1332 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001333 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001334 ctx.load_verify_locations(cadata=cacert_der)
1335 ctx.load_verify_locations(cadata=neuronio_der)
1336 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1337 # cert already in hash table
1338 ctx.load_verify_locations(cadata=cacert_der)
1339 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1340
1341 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001342 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001343 combined = b"".join((cacert_der, neuronio_der))
1344 ctx.load_verify_locations(cadata=combined)
1345 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1346
1347 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001348 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001349 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1350
1351 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1352 ctx.load_verify_locations(cadata="broken")
1353 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1354 ctx.load_verify_locations(cadata=b"broken")
1355
1356
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001357 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001358 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001359 ctx.load_dh_params(DHFILE)
1360 if os.name != 'nt':
1361 ctx.load_dh_params(BYTES_DHFILE)
1362 self.assertRaises(TypeError, ctx.load_dh_params)
1363 self.assertRaises(TypeError, ctx.load_dh_params, None)
1364 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001365 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001366 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001367 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001368 ctx.load_dh_params(CERTFILE)
1369
Antoine Pitroub0182c82010-10-12 20:09:02 +00001370 def test_session_stats(self):
1371 for proto in PROTOCOLS:
1372 ctx = ssl.SSLContext(proto)
1373 self.assertEqual(ctx.session_stats(), {
1374 'number': 0,
1375 'connect': 0,
1376 'connect_good': 0,
1377 'connect_renegotiate': 0,
1378 'accept': 0,
1379 'accept_good': 0,
1380 'accept_renegotiate': 0,
1381 'hits': 0,
1382 'misses': 0,
1383 'timeouts': 0,
1384 'cache_full': 0,
1385 })
1386
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001387 def test_set_default_verify_paths(self):
1388 # There's not much we can do to test that it acts as expected,
1389 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001390 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001391 ctx.set_default_verify_paths()
1392
Antoine Pitrou501da612011-12-21 09:27:41 +01001393 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001394 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001395 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001396 ctx.set_ecdh_curve("prime256v1")
1397 ctx.set_ecdh_curve(b"prime256v1")
1398 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1399 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1400 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1401 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1402
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001403 @needs_sni
1404 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001405 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001406
1407 # set_servername_callback expects a callable, or None
1408 self.assertRaises(TypeError, ctx.set_servername_callback)
1409 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1410 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1411 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1412
1413 def dummycallback(sock, servername, ctx):
1414 pass
1415 ctx.set_servername_callback(None)
1416 ctx.set_servername_callback(dummycallback)
1417
1418 @needs_sni
1419 def test_sni_callback_refcycle(self):
1420 # Reference cycles through the servername callback are detected
1421 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001422 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001423 def dummycallback(sock, servername, ctx, cycle=ctx):
1424 pass
1425 ctx.set_servername_callback(dummycallback)
1426 wr = weakref.ref(ctx)
1427 del ctx, dummycallback
1428 gc.collect()
1429 self.assertIs(wr(), None)
1430
Christian Heimes9a5395a2013-06-17 15:44:12 +02001431 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001432 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001433 self.assertEqual(ctx.cert_store_stats(),
1434 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1435 ctx.load_cert_chain(CERTFILE)
1436 self.assertEqual(ctx.cert_store_stats(),
1437 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1438 ctx.load_verify_locations(CERTFILE)
1439 self.assertEqual(ctx.cert_store_stats(),
1440 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001441 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001442 self.assertEqual(ctx.cert_store_stats(),
1443 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1444
1445 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001446 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001447 self.assertEqual(ctx.get_ca_certs(), [])
1448 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1449 ctx.load_verify_locations(CERTFILE)
1450 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001451 # but CAFILE_CACERT is a CA cert
1452 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001453 self.assertEqual(ctx.get_ca_certs(),
1454 [{'issuer': ((('organizationName', 'Root CA'),),
1455 (('organizationalUnitName', 'http://www.cacert.org'),),
1456 (('commonName', 'CA Cert Signing Authority'),),
1457 (('emailAddress', 'support@cacert.org'),)),
1458 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1459 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1460 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001461 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001462 'subject': ((('organizationName', 'Root CA'),),
1463 (('organizationalUnitName', 'http://www.cacert.org'),),
1464 (('commonName', 'CA Cert Signing Authority'),),
1465 (('emailAddress', 'support@cacert.org'),)),
1466 'version': 3}])
1467
Martin Panterb55f8b72016-01-14 12:53:56 +00001468 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001469 pem = f.read()
1470 der = ssl.PEM_cert_to_DER_cert(pem)
1471 self.assertEqual(ctx.get_ca_certs(True), [der])
1472
Christian Heimes72d28502013-11-23 13:56:58 +01001473 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001475 ctx.load_default_certs()
1476
Christian Heimesa170fa12017-09-15 20:27:30 +02001477 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001478 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1479 ctx.load_default_certs()
1480
Christian Heimesa170fa12017-09-15 20:27:30 +02001481 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001482 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1483
Christian Heimesa170fa12017-09-15 20:27:30 +02001484 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001485 self.assertRaises(TypeError, ctx.load_default_certs, None)
1486 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1487
Benjamin Peterson91244e02014-10-03 18:17:15 -04001488 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001489 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001490 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001491 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001492 with support.EnvironmentVarGuard() as env:
1493 env["SSL_CERT_DIR"] = CAPATH
1494 env["SSL_CERT_FILE"] = CERTFILE
1495 ctx.load_default_certs()
1496 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1497
Benjamin Peterson91244e02014-10-03 18:17:15 -04001498 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001499 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001500 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001501 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001502 ctx.load_default_certs()
1503 stats = ctx.cert_store_stats()
1504
Christian Heimesa170fa12017-09-15 20:27:30 +02001505 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001506 with support.EnvironmentVarGuard() as env:
1507 env["SSL_CERT_DIR"] = CAPATH
1508 env["SSL_CERT_FILE"] = CERTFILE
1509 ctx.load_default_certs()
1510 stats["x509"] += 1
1511 self.assertEqual(ctx.cert_store_stats(), stats)
1512
Christian Heimes358cfd42016-09-10 22:43:48 +02001513 def _assert_context_options(self, ctx):
1514 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1515 if OP_NO_COMPRESSION != 0:
1516 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1517 OP_NO_COMPRESSION)
1518 if OP_SINGLE_DH_USE != 0:
1519 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1520 OP_SINGLE_DH_USE)
1521 if OP_SINGLE_ECDH_USE != 0:
1522 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1523 OP_SINGLE_ECDH_USE)
1524 if OP_CIPHER_SERVER_PREFERENCE != 0:
1525 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1526 OP_CIPHER_SERVER_PREFERENCE)
1527
Christian Heimes4c05b472013-11-23 15:58:30 +01001528 def test_create_default_context(self):
1529 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001530
Christian Heimesa170fa12017-09-15 20:27:30 +02001531 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001532 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001533 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001534 self._assert_context_options(ctx)
1535
Christian Heimes4c05b472013-11-23 15:58:30 +01001536 with open(SIGNING_CA) as f:
1537 cadata = f.read()
1538 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1539 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001540 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001541 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001542 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001543
1544 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001545 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001546 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001547 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001548
Christian Heimes67986f92013-11-23 22:43:47 +01001549 def test__create_stdlib_context(self):
1550 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001551 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001552 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001553 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001554 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001555
1556 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1557 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1558 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001559 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001560
1561 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001562 cert_reqs=ssl.CERT_REQUIRED,
1563 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001564 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1565 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001566 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001567 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001568
1569 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001570 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001571 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001572 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001573
Christian Heimes1aa9a752013-12-02 02:41:19 +01001574 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001575 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001576 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001577 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001578
Christian Heimese82c0342017-09-15 20:29:57 +02001579 # Auto set CERT_REQUIRED
1580 ctx.check_hostname = True
1581 self.assertTrue(ctx.check_hostname)
1582 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1583 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001584 ctx.verify_mode = ssl.CERT_REQUIRED
1585 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001586 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001587
Christian Heimese82c0342017-09-15 20:29:57 +02001588 # Changing verify_mode does not affect check_hostname
1589 ctx.check_hostname = False
1590 ctx.verify_mode = ssl.CERT_NONE
1591 ctx.check_hostname = False
1592 self.assertFalse(ctx.check_hostname)
1593 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1594 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001595 ctx.check_hostname = True
1596 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001597 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1598
1599 ctx.check_hostname = False
1600 ctx.verify_mode = ssl.CERT_OPTIONAL
1601 ctx.check_hostname = False
1602 self.assertFalse(ctx.check_hostname)
1603 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1604 # keep CERT_OPTIONAL
1605 ctx.check_hostname = True
1606 self.assertTrue(ctx.check_hostname)
1607 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001608
1609 # Cannot set CERT_NONE with check_hostname enabled
1610 with self.assertRaises(ValueError):
1611 ctx.verify_mode = ssl.CERT_NONE
1612 ctx.check_hostname = False
1613 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001614 ctx.verify_mode = ssl.CERT_NONE
1615 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001616
Christian Heimes5fe668c2016-09-12 00:01:11 +02001617 def test_context_client_server(self):
1618 # PROTOCOL_TLS_CLIENT has sane defaults
1619 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1620 self.assertTrue(ctx.check_hostname)
1621 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1622
1623 # PROTOCOL_TLS_SERVER has different but also sane defaults
1624 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1625 self.assertFalse(ctx.check_hostname)
1626 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1627
Christian Heimes4df60f12017-09-15 20:26:05 +02001628 def test_context_custom_class(self):
1629 class MySSLSocket(ssl.SSLSocket):
1630 pass
1631
1632 class MySSLObject(ssl.SSLObject):
1633 pass
1634
1635 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1636 ctx.sslsocket_class = MySSLSocket
1637 ctx.sslobject_class = MySSLObject
1638
1639 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1640 self.assertIsInstance(sock, MySSLSocket)
1641 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1642 self.assertIsInstance(obj, MySSLObject)
1643
Christian Heimes78c7d522019-06-03 21:00:10 +02001644 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1645 def test_num_tickest(self):
1646 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1647 self.assertEqual(ctx.num_tickets, 2)
1648 ctx.num_tickets = 1
1649 self.assertEqual(ctx.num_tickets, 1)
1650 ctx.num_tickets = 0
1651 self.assertEqual(ctx.num_tickets, 0)
1652 with self.assertRaises(ValueError):
1653 ctx.num_tickets = -1
1654 with self.assertRaises(TypeError):
1655 ctx.num_tickets = None
1656
1657 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1658 self.assertEqual(ctx.num_tickets, 2)
1659 with self.assertRaises(ValueError):
1660 ctx.num_tickets = 1
1661
Antoine Pitrou152efa22010-05-16 18:19:27 +00001662
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001663class SSLErrorTests(unittest.TestCase):
1664
1665 def test_str(self):
1666 # The str() of a SSLError doesn't include the errno
1667 e = ssl.SSLError(1, "foo")
1668 self.assertEqual(str(e), "foo")
1669 self.assertEqual(e.errno, 1)
1670 # Same for a subclass
1671 e = ssl.SSLZeroReturnError(1, "foo")
1672 self.assertEqual(str(e), "foo")
1673 self.assertEqual(e.errno, 1)
1674
1675 def test_lib_reason(self):
1676 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001677 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001678 with self.assertRaises(ssl.SSLError) as cm:
1679 ctx.load_dh_params(CERTFILE)
1680 self.assertEqual(cm.exception.library, 'PEM')
1681 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1682 s = str(cm.exception)
1683 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1684
1685 def test_subclass(self):
1686 # Check that the appropriate SSLError subclass is raised
1687 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001688 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1689 ctx.check_hostname = False
1690 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001691 with socket.create_server(("127.0.0.1", 0)) as s:
1692 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001693 c.setblocking(False)
1694 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001695 with self.assertRaises(ssl.SSLWantReadError) as cm:
1696 c.do_handshake()
1697 s = str(cm.exception)
1698 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1699 # For compatibility
1700 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1701
1702
Christian Heimes61d478c2018-01-27 15:51:38 +01001703 def test_bad_server_hostname(self):
1704 ctx = ssl.create_default_context()
1705 with self.assertRaises(ValueError):
1706 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1707 server_hostname="")
1708 with self.assertRaises(ValueError):
1709 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1710 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001711 with self.assertRaises(TypeError):
1712 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1713 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001714
1715
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001716class MemoryBIOTests(unittest.TestCase):
1717
1718 def test_read_write(self):
1719 bio = ssl.MemoryBIO()
1720 bio.write(b'foo')
1721 self.assertEqual(bio.read(), b'foo')
1722 self.assertEqual(bio.read(), b'')
1723 bio.write(b'foo')
1724 bio.write(b'bar')
1725 self.assertEqual(bio.read(), b'foobar')
1726 self.assertEqual(bio.read(), b'')
1727 bio.write(b'baz')
1728 self.assertEqual(bio.read(2), b'ba')
1729 self.assertEqual(bio.read(1), b'z')
1730 self.assertEqual(bio.read(1), b'')
1731
1732 def test_eof(self):
1733 bio = ssl.MemoryBIO()
1734 self.assertFalse(bio.eof)
1735 self.assertEqual(bio.read(), b'')
1736 self.assertFalse(bio.eof)
1737 bio.write(b'foo')
1738 self.assertFalse(bio.eof)
1739 bio.write_eof()
1740 self.assertFalse(bio.eof)
1741 self.assertEqual(bio.read(2), b'fo')
1742 self.assertFalse(bio.eof)
1743 self.assertEqual(bio.read(1), b'o')
1744 self.assertTrue(bio.eof)
1745 self.assertEqual(bio.read(), b'')
1746 self.assertTrue(bio.eof)
1747
1748 def test_pending(self):
1749 bio = ssl.MemoryBIO()
1750 self.assertEqual(bio.pending, 0)
1751 bio.write(b'foo')
1752 self.assertEqual(bio.pending, 3)
1753 for i in range(3):
1754 bio.read(1)
1755 self.assertEqual(bio.pending, 3-i-1)
1756 for i in range(3):
1757 bio.write(b'x')
1758 self.assertEqual(bio.pending, i+1)
1759 bio.read()
1760 self.assertEqual(bio.pending, 0)
1761
1762 def test_buffer_types(self):
1763 bio = ssl.MemoryBIO()
1764 bio.write(b'foo')
1765 self.assertEqual(bio.read(), b'foo')
1766 bio.write(bytearray(b'bar'))
1767 self.assertEqual(bio.read(), b'bar')
1768 bio.write(memoryview(b'baz'))
1769 self.assertEqual(bio.read(), b'baz')
1770
1771 def test_error_types(self):
1772 bio = ssl.MemoryBIO()
1773 self.assertRaises(TypeError, bio.write, 'foo')
1774 self.assertRaises(TypeError, bio.write, None)
1775 self.assertRaises(TypeError, bio.write, True)
1776 self.assertRaises(TypeError, bio.write, 1)
1777
1778
Christian Heimes9d50ab52018-02-27 10:17:30 +01001779class SSLObjectTests(unittest.TestCase):
1780 def test_private_init(self):
1781 bio = ssl.MemoryBIO()
1782 with self.assertRaisesRegex(TypeError, "public constructor"):
1783 ssl.SSLObject(bio, bio)
1784
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001785 def test_unwrap(self):
1786 client_ctx, server_ctx, hostname = testing_context()
1787 c_in = ssl.MemoryBIO()
1788 c_out = ssl.MemoryBIO()
1789 s_in = ssl.MemoryBIO()
1790 s_out = ssl.MemoryBIO()
1791 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1792 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1793
1794 # Loop on the handshake for a bit to get it settled
1795 for _ in range(5):
1796 try:
1797 client.do_handshake()
1798 except ssl.SSLWantReadError:
1799 pass
1800 if c_out.pending:
1801 s_in.write(c_out.read())
1802 try:
1803 server.do_handshake()
1804 except ssl.SSLWantReadError:
1805 pass
1806 if s_out.pending:
1807 c_in.write(s_out.read())
1808 # Now the handshakes should be complete (don't raise WantReadError)
1809 client.do_handshake()
1810 server.do_handshake()
1811
1812 # Now if we unwrap one side unilaterally, it should send close-notify
1813 # and raise WantReadError:
1814 with self.assertRaises(ssl.SSLWantReadError):
1815 client.unwrap()
1816
1817 # But server.unwrap() does not raise, because it reads the client's
1818 # close-notify:
1819 s_in.write(c_out.read())
1820 server.unwrap()
1821
1822 # And now that the client gets the server's close-notify, it doesn't
1823 # raise either.
1824 c_in.write(s_out.read())
1825 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001826
Martin Panter3840b2a2016-03-27 01:53:46 +00001827class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001828 """Tests that connect to a simple server running in the background"""
1829
1830 def setUp(self):
1831 server = ThreadedEchoServer(SIGNED_CERTFILE)
1832 self.server_addr = (HOST, server.port)
1833 server.__enter__()
1834 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001835
Antoine Pitrou480a1242010-04-28 21:37:09 +00001836 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001837 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001838 cert_reqs=ssl.CERT_NONE) as s:
1839 s.connect(self.server_addr)
1840 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001841 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001842
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001844 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001845 cert_reqs=ssl.CERT_REQUIRED,
1846 ca_certs=SIGNING_CA) as s:
1847 s.connect(self.server_addr)
1848 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001849 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001850
Martin Panter3840b2a2016-03-27 01:53:46 +00001851 def test_connect_fail(self):
1852 # This should fail because we have no verification certs. Connection
1853 # failure crashes ThreadedEchoServer, so run this in an independent
1854 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001855 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001856 cert_reqs=ssl.CERT_REQUIRED)
1857 self.addCleanup(s.close)
1858 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1859 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001860
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001861 def test_connect_ex(self):
1862 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001863 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001864 cert_reqs=ssl.CERT_REQUIRED,
1865 ca_certs=SIGNING_CA)
1866 self.addCleanup(s.close)
1867 self.assertEqual(0, s.connect_ex(self.server_addr))
1868 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001869
1870 def test_non_blocking_connect_ex(self):
1871 # Issue #11326: non-blocking connect_ex() should allow handshake
1872 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001873 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001874 cert_reqs=ssl.CERT_REQUIRED,
1875 ca_certs=SIGNING_CA,
1876 do_handshake_on_connect=False)
1877 self.addCleanup(s.close)
1878 s.setblocking(False)
1879 rc = s.connect_ex(self.server_addr)
1880 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1881 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1882 # Wait for connect to finish
1883 select.select([], [s], [], 5.0)
1884 # Non-blocking handshake
1885 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001886 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001887 s.do_handshake()
1888 break
1889 except ssl.SSLWantReadError:
1890 select.select([s], [], [], 5.0)
1891 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001892 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001893 # SSL established
1894 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001895
Antoine Pitrou152efa22010-05-16 18:19:27 +00001896 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001897 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001898 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001899 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1900 s.connect(self.server_addr)
1901 self.assertEqual({}, s.getpeercert())
1902 # Same with a server hostname
1903 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1904 server_hostname="dummy") as s:
1905 s.connect(self.server_addr)
1906 ctx.verify_mode = ssl.CERT_REQUIRED
1907 # This should succeed because we specify the root cert
1908 ctx.load_verify_locations(SIGNING_CA)
1909 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1910 s.connect(self.server_addr)
1911 cert = s.getpeercert()
1912 self.assertTrue(cert)
1913
1914 def test_connect_with_context_fail(self):
1915 # This should fail because we have no verification certs. Connection
1916 # failure crashes ThreadedEchoServer, so run this in an independent
1917 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001918 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001919 ctx.verify_mode = ssl.CERT_REQUIRED
1920 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1921 self.addCleanup(s.close)
1922 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1923 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001924
1925 def test_connect_capath(self):
1926 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001927 # NOTE: the subject hashing algorithm has been changed between
1928 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1929 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001930 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001931 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 ctx.verify_mode = ssl.CERT_REQUIRED
1933 ctx.load_verify_locations(capath=CAPATH)
1934 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1935 s.connect(self.server_addr)
1936 cert = s.getpeercert()
1937 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001938
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001940 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001941 ctx.verify_mode = ssl.CERT_REQUIRED
1942 ctx.load_verify_locations(capath=BYTES_CAPATH)
1943 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1944 s.connect(self.server_addr)
1945 cert = s.getpeercert()
1946 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001947
Christian Heimesefff7062013-11-21 03:35:02 +01001948 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001949 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001950 pem = f.read()
1951 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001952 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001953 ctx.verify_mode = ssl.CERT_REQUIRED
1954 ctx.load_verify_locations(cadata=pem)
1955 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1956 s.connect(self.server_addr)
1957 cert = s.getpeercert()
1958 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001959
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001961 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001962 ctx.verify_mode = ssl.CERT_REQUIRED
1963 ctx.load_verify_locations(cadata=der)
1964 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1965 s.connect(self.server_addr)
1966 cert = s.getpeercert()
1967 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001968
Antoine Pitroue3220242010-04-24 11:13:53 +00001969 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1970 def test_makefile_close(self):
1971 # Issue #5238: creating a file-like object with makefile() shouldn't
1972 # delay closing the underlying "real socket" (here tested with its
1973 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001974 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001975 ss.connect(self.server_addr)
1976 fd = ss.fileno()
1977 f = ss.makefile()
1978 f.close()
1979 # The fd is still open
1980 os.read(fd, 0)
1981 # Closing the SSL socket should close the fd too
1982 ss.close()
1983 gc.collect()
1984 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001985 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001986 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001987
Antoine Pitrou480a1242010-04-28 21:37:09 +00001988 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001989 s = socket.socket(socket.AF_INET)
1990 s.connect(self.server_addr)
1991 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001992 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001993 cert_reqs=ssl.CERT_NONE,
1994 do_handshake_on_connect=False)
1995 self.addCleanup(s.close)
1996 count = 0
1997 while True:
1998 try:
1999 count += 1
2000 s.do_handshake()
2001 break
2002 except ssl.SSLWantReadError:
2003 select.select([s], [], [])
2004 except ssl.SSLWantWriteError:
2005 select.select([], [s], [])
2006 if support.verbose:
2007 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002008
Antoine Pitrou480a1242010-04-28 21:37:09 +00002009 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002010 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002011
Martin Panter3840b2a2016-03-27 01:53:46 +00002012 def test_get_server_certificate_fail(self):
2013 # Connection failure crashes ThreadedEchoServer, so run this in an
2014 # independent test method
2015 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002016
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002017 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002018 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002019 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2020 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002021 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002022 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2023 s.connect(self.server_addr)
2024 # Error checking can happen at instantiation or when connecting
2025 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2026 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002027 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002028 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2029 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002030
Christian Heimes9a5395a2013-06-17 15:44:12 +02002031 def test_get_ca_certs_capath(self):
2032 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002033 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002034 ctx.load_verify_locations(capath=CAPATH)
2035 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002036 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2037 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002038 s.connect(self.server_addr)
2039 cert = s.getpeercert()
2040 self.assertTrue(cert)
2041 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002042
Christian Heimes575596e2013-12-15 21:49:17 +01002043 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002044 def test_context_setget(self):
2045 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002046 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2047 ctx1.load_verify_locations(capath=CAPATH)
2048 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2049 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002050 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002051 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002052 ss.connect(self.server_addr)
2053 self.assertIs(ss.context, ctx1)
2054 self.assertIs(ss._sslobj.context, ctx1)
2055 ss.context = ctx2
2056 self.assertIs(ss.context, ctx2)
2057 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002058
2059 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2060 # A simple IO loop. Call func(*args) depending on the error we get
2061 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2062 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002063 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002064 count = 0
2065 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002066 if time.monotonic() > deadline:
2067 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002068 errno = None
2069 count += 1
2070 try:
2071 ret = func(*args)
2072 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002073 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002074 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002075 raise
2076 errno = e.errno
2077 # Get any data from the outgoing BIO irrespective of any error, and
2078 # send it to the socket.
2079 buf = outgoing.read()
2080 sock.sendall(buf)
2081 # If there's no error, we're done. For WANT_READ, we need to get
2082 # data from the socket and put it in the incoming BIO.
2083 if errno is None:
2084 break
2085 elif errno == ssl.SSL_ERROR_WANT_READ:
2086 buf = sock.recv(32768)
2087 if buf:
2088 incoming.write(buf)
2089 else:
2090 incoming.write_eof()
2091 if support.verbose:
2092 sys.stdout.write("Needed %d calls to complete %s().\n"
2093 % (count, func.__name__))
2094 return ret
2095
Martin Panter3840b2a2016-03-27 01:53:46 +00002096 def test_bio_handshake(self):
2097 sock = socket.socket(socket.AF_INET)
2098 self.addCleanup(sock.close)
2099 sock.connect(self.server_addr)
2100 incoming = ssl.MemoryBIO()
2101 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002102 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2103 self.assertTrue(ctx.check_hostname)
2104 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002105 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002106 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2107 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002108 self.assertIs(sslobj._sslobj.owner, sslobj)
2109 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002110 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002111 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002112 self.assertRaises(ValueError, sslobj.getpeercert)
2113 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2114 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2115 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2116 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002117 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002118 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002119 self.assertTrue(sslobj.getpeercert())
2120 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2121 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2122 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002123 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002124 except ssl.SSLSyscallError:
2125 # If the server shuts down the TCP connection without sending a
2126 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2127 pass
2128 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2129
2130 def test_bio_read_write_data(self):
2131 sock = socket.socket(socket.AF_INET)
2132 self.addCleanup(sock.close)
2133 sock.connect(self.server_addr)
2134 incoming = ssl.MemoryBIO()
2135 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002136 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002137 ctx.verify_mode = ssl.CERT_NONE
2138 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2139 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2140 req = b'FOO\n'
2141 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2142 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2143 self.assertEqual(buf, b'foo\n')
2144 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002145
2146
Martin Panter3840b2a2016-03-27 01:53:46 +00002147class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002148
Martin Panter3840b2a2016-03-27 01:53:46 +00002149 def test_timeout_connect_ex(self):
2150 # Issue #12065: on a timeout, connect_ex() should return the original
2151 # errno (mimicking the behaviour of non-SSL sockets).
2152 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002153 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002154 cert_reqs=ssl.CERT_REQUIRED,
2155 do_handshake_on_connect=False)
2156 self.addCleanup(s.close)
2157 s.settimeout(0.0000001)
2158 rc = s.connect_ex((REMOTE_HOST, 443))
2159 if rc == 0:
2160 self.skipTest("REMOTE_HOST responded too quickly")
2161 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2162
2163 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2164 def test_get_server_certificate_ipv6(self):
2165 with support.transient_internet('ipv6.google.com'):
2166 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2167 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2168
Martin Panter3840b2a2016-03-27 01:53:46 +00002169
2170def _test_get_server_certificate(test, host, port, cert=None):
2171 pem = ssl.get_server_certificate((host, port))
2172 if not pem:
2173 test.fail("No server certificate on %s:%s!" % (host, port))
2174
2175 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2176 if not pem:
2177 test.fail("No server certificate on %s:%s!" % (host, port))
2178 if support.verbose:
2179 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2180
2181def _test_get_server_certificate_fail(test, host, port):
2182 try:
2183 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2184 except ssl.SSLError as x:
2185 #should fail
2186 if support.verbose:
2187 sys.stdout.write("%s\n" % x)
2188 else:
2189 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2190
2191
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002192from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002193
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002194class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002195
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002196 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002197
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002198 """A mildly complicated class, because we want it to work both
2199 with and without the SSL wrapper around the socket connection, so
2200 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002201
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002202 def __init__(self, server, connsock, addr):
2203 self.server = server
2204 self.running = False
2205 self.sock = connsock
2206 self.addr = addr
2207 self.sock.setblocking(1)
2208 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002209 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002210 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002211
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002212 def wrap_conn(self):
2213 try:
2214 self.sslconn = self.server.context.wrap_socket(
2215 self.sock, server_side=True)
2216 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2217 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002218 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002219 # We treat ConnectionResetError as though it were an
2220 # SSLError - OpenSSL on Ubuntu abruptly closes the
2221 # connection when asked to use an unsupported protocol.
2222 #
Christian Heimes529525f2018-05-23 22:24:45 +02002223 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2224 # tries to send session tickets after handshake.
2225 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002226 #
2227 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2228 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002229 self.server.conn_errors.append(str(e))
2230 if self.server.chatty:
2231 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2232 self.running = False
2233 self.close()
2234 return False
2235 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002236 # OSError may occur with wrong protocols, e.g. both
2237 # sides use PROTOCOL_TLS_SERVER.
2238 #
2239 # XXX Various errors can have happened here, for example
2240 # a mismatching protocol version, an invalid certificate,
2241 # or a low-level bug. This should be made more discriminating.
2242 #
2243 # bpo-31323: Store the exception as string to prevent
2244 # a reference leak: server -> conn_errors -> exception
2245 # -> traceback -> self (ConnectionHandler) -> server
2246 self.server.conn_errors.append(str(e))
2247 if self.server.chatty:
2248 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2249 self.running = False
2250 self.server.stop()
2251 self.close()
2252 return False
2253 else:
2254 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2255 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2256 cert = self.sslconn.getpeercert()
2257 if support.verbose and self.server.chatty:
2258 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2259 cert_binary = self.sslconn.getpeercert(True)
2260 if support.verbose and self.server.chatty:
2261 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2262 cipher = self.sslconn.cipher()
2263 if support.verbose and self.server.chatty:
2264 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2265 sys.stdout.write(" server: selected protocol is now "
2266 + str(self.sslconn.selected_npn_protocol()) + "\n")
2267 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002268
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002269 def read(self):
2270 if self.sslconn:
2271 return self.sslconn.read()
2272 else:
2273 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002274
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002275 def write(self, bytes):
2276 if self.sslconn:
2277 return self.sslconn.write(bytes)
2278 else:
2279 return self.sock.send(bytes)
2280
2281 def close(self):
2282 if self.sslconn:
2283 self.sslconn.close()
2284 else:
2285 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002286
Antoine Pitrou480a1242010-04-28 21:37:09 +00002287 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002288 self.running = True
2289 if not self.server.starttls_server:
2290 if not self.wrap_conn():
2291 return
2292 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002293 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002294 msg = self.read()
2295 stripped = msg.strip()
2296 if not stripped:
2297 # eof, so quit this handler
2298 self.running = False
2299 try:
2300 self.sock = self.sslconn.unwrap()
2301 except OSError:
2302 # Many tests shut the TCP connection down
2303 # without an SSL shutdown. This causes
2304 # unwrap() to raise OSError with errno=0!
2305 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002306 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002307 self.sslconn = None
2308 self.close()
2309 elif stripped == b'over':
2310 if support.verbose and self.server.connectionchatty:
2311 sys.stdout.write(" server: client closed connection\n")
2312 self.close()
2313 return
2314 elif (self.server.starttls_server and
2315 stripped == b'STARTTLS'):
2316 if support.verbose and self.server.connectionchatty:
2317 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2318 self.write(b"OK\n")
2319 if not self.wrap_conn():
2320 return
2321 elif (self.server.starttls_server and self.sslconn
2322 and stripped == b'ENDTLS'):
2323 if support.verbose and self.server.connectionchatty:
2324 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2325 self.write(b"OK\n")
2326 self.sock = self.sslconn.unwrap()
2327 self.sslconn = None
2328 if support.verbose and self.server.connectionchatty:
2329 sys.stdout.write(" server: connection is now unencrypted...\n")
2330 elif stripped == b'CB tls-unique':
2331 if support.verbose and self.server.connectionchatty:
2332 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2333 data = self.sslconn.get_channel_binding("tls-unique")
2334 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002335 elif stripped == b'PHA':
2336 if support.verbose and self.server.connectionchatty:
2337 sys.stdout.write(" server: initiating post handshake auth\n")
2338 try:
2339 self.sslconn.verify_client_post_handshake()
2340 except ssl.SSLError as e:
2341 self.write(repr(e).encode("us-ascii") + b"\n")
2342 else:
2343 self.write(b"OK\n")
2344 elif stripped == b'HASCERT':
2345 if self.sslconn.getpeercert() is not None:
2346 self.write(b'TRUE\n')
2347 else:
2348 self.write(b'FALSE\n')
2349 elif stripped == b'GETCERT':
2350 cert = self.sslconn.getpeercert()
2351 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002352 else:
2353 if (support.verbose and
2354 self.server.connectionchatty):
2355 ctype = (self.sslconn and "encrypted") or "unencrypted"
2356 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2357 % (msg, ctype, msg.lower(), ctype))
2358 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002359 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002360 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2361 # when connection is not shut down gracefully.
2362 if self.server.chatty and support.verbose:
2363 sys.stdout.write(
2364 " Connection reset by peer: {}\n".format(
2365 self.addr)
2366 )
2367 self.close()
2368 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002369 except ssl.SSLError as err:
2370 # On Windows sometimes test_pha_required_nocert receives the
2371 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2372 # before the 'tlsv13 alert certificate required' exception.
2373 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2374 # is received test_pha_required_nocert fails with ConnectionResetError
2375 # because the underlying socket is closed
2376 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2377 if self.server.chatty and support.verbose:
2378 sys.stdout.write(err.args[1])
2379 # test_pha_required_nocert is expecting this exception
2380 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002381 except OSError:
2382 if self.server.chatty:
2383 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002384 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002385 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002386
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002387 # normally, we'd just stop here, but for the test
2388 # harness, we want to stop the server
2389 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002390
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002391 def __init__(self, certificate=None, ssl_version=None,
2392 certreqs=None, cacerts=None,
2393 chatty=True, connectionchatty=False, starttls_server=False,
2394 npn_protocols=None, alpn_protocols=None,
2395 ciphers=None, context=None):
2396 if context:
2397 self.context = context
2398 else:
2399 self.context = ssl.SSLContext(ssl_version
2400 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002401 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002402 self.context.verify_mode = (certreqs if certreqs is not None
2403 else ssl.CERT_NONE)
2404 if cacerts:
2405 self.context.load_verify_locations(cacerts)
2406 if certificate:
2407 self.context.load_cert_chain(certificate)
2408 if npn_protocols:
2409 self.context.set_npn_protocols(npn_protocols)
2410 if alpn_protocols:
2411 self.context.set_alpn_protocols(alpn_protocols)
2412 if ciphers:
2413 self.context.set_ciphers(ciphers)
2414 self.chatty = chatty
2415 self.connectionchatty = connectionchatty
2416 self.starttls_server = starttls_server
2417 self.sock = socket.socket()
2418 self.port = support.bind_port(self.sock)
2419 self.flag = None
2420 self.active = False
2421 self.selected_npn_protocols = []
2422 self.selected_alpn_protocols = []
2423 self.shared_ciphers = []
2424 self.conn_errors = []
2425 threading.Thread.__init__(self)
2426 self.daemon = True
2427
2428 def __enter__(self):
2429 self.start(threading.Event())
2430 self.flag.wait()
2431 return self
2432
2433 def __exit__(self, *args):
2434 self.stop()
2435 self.join()
2436
2437 def start(self, flag=None):
2438 self.flag = flag
2439 threading.Thread.start(self)
2440
2441 def run(self):
2442 self.sock.settimeout(0.05)
2443 self.sock.listen()
2444 self.active = True
2445 if self.flag:
2446 # signal an event
2447 self.flag.set()
2448 while self.active:
2449 try:
2450 newconn, connaddr = self.sock.accept()
2451 if support.verbose and self.chatty:
2452 sys.stdout.write(' server: new connection from '
2453 + repr(connaddr) + '\n')
2454 handler = self.ConnectionHandler(self, newconn, connaddr)
2455 handler.start()
2456 handler.join()
2457 except socket.timeout:
2458 pass
2459 except KeyboardInterrupt:
2460 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002461 except BaseException as e:
2462 if support.verbose and self.chatty:
2463 sys.stdout.write(
2464 ' connection handling failed: ' + repr(e) + '\n')
2465
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002466 self.sock.close()
2467
2468 def stop(self):
2469 self.active = False
2470
2471class AsyncoreEchoServer(threading.Thread):
2472
2473 # this one's based on asyncore.dispatcher
2474
2475 class EchoServer (asyncore.dispatcher):
2476
2477 class ConnectionHandler(asyncore.dispatcher_with_send):
2478
2479 def __init__(self, conn, certfile):
2480 self.socket = test_wrap_socket(conn, server_side=True,
2481 certfile=certfile,
2482 do_handshake_on_connect=False)
2483 asyncore.dispatcher_with_send.__init__(self, self.socket)
2484 self._ssl_accepting = True
2485 self._do_ssl_handshake()
2486
2487 def readable(self):
2488 if isinstance(self.socket, ssl.SSLSocket):
2489 while self.socket.pending() > 0:
2490 self.handle_read_event()
2491 return True
2492
2493 def _do_ssl_handshake(self):
2494 try:
2495 self.socket.do_handshake()
2496 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2497 return
2498 except ssl.SSLEOFError:
2499 return self.handle_close()
2500 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002501 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 except OSError as err:
2503 if err.args[0] == errno.ECONNABORTED:
2504 return self.handle_close()
2505 else:
2506 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002507
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002508 def handle_read(self):
2509 if self._ssl_accepting:
2510 self._do_ssl_handshake()
2511 else:
2512 data = self.recv(1024)
2513 if support.verbose:
2514 sys.stdout.write(" server: read %s from client\n" % repr(data))
2515 if not data:
2516 self.close()
2517 else:
2518 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002519
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002520 def handle_close(self):
2521 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002522 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002523 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002524
2525 def handle_error(self):
2526 raise
2527
Trent Nelson78520002008-04-10 20:54:35 +00002528 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 self.certfile = certfile
2530 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2531 self.port = support.bind_port(sock, '')
2532 asyncore.dispatcher.__init__(self, sock)
2533 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002534
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002535 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002536 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002537 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2538 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002539
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002540 def handle_error(self):
2541 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002542
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002543 def __init__(self, certfile):
2544 self.flag = None
2545 self.active = False
2546 self.server = self.EchoServer(certfile)
2547 self.port = self.server.port
2548 threading.Thread.__init__(self)
2549 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002550
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002551 def __str__(self):
2552 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002553
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002554 def __enter__(self):
2555 self.start(threading.Event())
2556 self.flag.wait()
2557 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002558
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002559 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002560 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002561 sys.stdout.write(" cleanup: stopping server.\n")
2562 self.stop()
2563 if support.verbose:
2564 sys.stdout.write(" cleanup: joining server thread.\n")
2565 self.join()
2566 if support.verbose:
2567 sys.stdout.write(" cleanup: successfully joined.\n")
2568 # make sure that ConnectionHandler is removed from socket_map
2569 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002570
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002571 def start (self, flag=None):
2572 self.flag = flag
2573 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002574
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002575 def run(self):
2576 self.active = True
2577 if self.flag:
2578 self.flag.set()
2579 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002580 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002581 asyncore.loop(1)
2582 except:
2583 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002584
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002585 def stop(self):
2586 self.active = False
2587 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002588
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002589def server_params_test(client_context, server_context, indata=b"FOO\n",
2590 chatty=True, connectionchatty=False, sni_name=None,
2591 session=None):
2592 """
2593 Launch a server, connect a client to it and try various reads
2594 and writes.
2595 """
2596 stats = {}
2597 server = ThreadedEchoServer(context=server_context,
2598 chatty=chatty,
2599 connectionchatty=False)
2600 with server:
2601 with client_context.wrap_socket(socket.socket(),
2602 server_hostname=sni_name, session=session) as s:
2603 s.connect((HOST, server.port))
2604 for arg in [indata, bytearray(indata), memoryview(indata)]:
2605 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002606 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002607 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002608 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002609 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002610 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002611 if connectionchatty:
2612 if support.verbose:
2613 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002614 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002615 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002616 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2617 % (outdata[:20], len(outdata),
2618 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002619 s.write(b"over\n")
2620 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002621 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002622 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002623 stats.update({
2624 'compression': s.compression(),
2625 'cipher': s.cipher(),
2626 'peercert': s.getpeercert(),
2627 'client_alpn_protocol': s.selected_alpn_protocol(),
2628 'client_npn_protocol': s.selected_npn_protocol(),
2629 'version': s.version(),
2630 'session_reused': s.session_reused,
2631 'session': s.session,
2632 })
2633 s.close()
2634 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2635 stats['server_npn_protocols'] = server.selected_npn_protocols
2636 stats['server_shared_ciphers'] = server.shared_ciphers
2637 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002638
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002639def try_protocol_combo(server_protocol, client_protocol, expect_success,
2640 certsreqs=None, server_options=0, client_options=0):
2641 """
2642 Try to SSL-connect using *client_protocol* to *server_protocol*.
2643 If *expect_success* is true, assert that the connection succeeds,
2644 if it's false, assert that the connection fails.
2645 Also, if *expect_success* is a string, assert that it is the protocol
2646 version actually used by the connection.
2647 """
2648 if certsreqs is None:
2649 certsreqs = ssl.CERT_NONE
2650 certtype = {
2651 ssl.CERT_NONE: "CERT_NONE",
2652 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2653 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2654 }[certsreqs]
2655 if support.verbose:
2656 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2657 sys.stdout.write(formatstr %
2658 (ssl.get_protocol_name(client_protocol),
2659 ssl.get_protocol_name(server_protocol),
2660 certtype))
2661 client_context = ssl.SSLContext(client_protocol)
2662 client_context.options |= client_options
2663 server_context = ssl.SSLContext(server_protocol)
2664 server_context.options |= server_options
2665
Victor Stinner3ef63442019-02-19 18:06:03 +01002666 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2667 if (min_version is not None
2668 # SSLContext.minimum_version is only available on recent OpenSSL
2669 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2670 and hasattr(server_context, 'minimum_version')
2671 and server_protocol == ssl.PROTOCOL_TLS
2672 and server_context.minimum_version > min_version):
2673 # If OpenSSL configuration is strict and requires more recent TLS
2674 # version, we have to change the minimum to test old TLS versions.
2675 server_context.minimum_version = min_version
2676
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002677 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2678 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2679 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002680 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002681 client_context.set_ciphers("ALL")
2682
2683 for ctx in (client_context, server_context):
2684 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002685 ctx.load_cert_chain(SIGNED_CERTFILE)
2686 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687 try:
2688 stats = server_params_test(client_context, server_context,
2689 chatty=False, connectionchatty=False)
2690 # Protocol mismatch can result in either an SSLError, or a
2691 # "Connection reset by peer" error.
2692 except ssl.SSLError:
2693 if expect_success:
2694 raise
2695 except OSError as e:
2696 if expect_success or e.errno != errno.ECONNRESET:
2697 raise
2698 else:
2699 if not expect_success:
2700 raise AssertionError(
2701 "Client protocol %s succeeded with server protocol %s!"
2702 % (ssl.get_protocol_name(client_protocol),
2703 ssl.get_protocol_name(server_protocol)))
2704 elif (expect_success is not True
2705 and expect_success != stats['version']):
2706 raise AssertionError("version mismatch: expected %r, got %r"
2707 % (expect_success, stats['version']))
2708
2709
2710class ThreadedTests(unittest.TestCase):
2711
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002712 def test_echo(self):
2713 """Basic test of an SSL client connecting to a server"""
2714 if support.verbose:
2715 sys.stdout.write("\n")
2716 for protocol in PROTOCOLS:
2717 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2718 continue
2719 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2720 context = ssl.SSLContext(protocol)
2721 context.load_cert_chain(CERTFILE)
2722 server_params_test(context, context,
2723 chatty=True, connectionchatty=True)
2724
Christian Heimesa170fa12017-09-15 20:27:30 +02002725 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002726
2727 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2728 server_params_test(client_context=client_context,
2729 server_context=server_context,
2730 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002731 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002732
2733 client_context.check_hostname = False
2734 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2735 with self.assertRaises(ssl.SSLError) as e:
2736 server_params_test(client_context=server_context,
2737 server_context=client_context,
2738 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002739 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002740 self.assertIn('called a function you should not call',
2741 str(e.exception))
2742
2743 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2744 with self.assertRaises(ssl.SSLError) as e:
2745 server_params_test(client_context=server_context,
2746 server_context=server_context,
2747 chatty=True, connectionchatty=True)
2748 self.assertIn('called a function you should not call',
2749 str(e.exception))
2750
2751 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2752 with self.assertRaises(ssl.SSLError) as e:
2753 server_params_test(client_context=server_context,
2754 server_context=client_context,
2755 chatty=True, connectionchatty=True)
2756 self.assertIn('called a function you should not call',
2757 str(e.exception))
2758
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002759 def test_getpeercert(self):
2760 if support.verbose:
2761 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002762
2763 client_context, server_context, hostname = testing_context()
2764 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002765 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002766 with client_context.wrap_socket(socket.socket(),
2767 do_handshake_on_connect=False,
2768 server_hostname=hostname) as s:
2769 s.connect((HOST, server.port))
2770 # getpeercert() raise ValueError while the handshake isn't
2771 # done.
2772 with self.assertRaises(ValueError):
2773 s.getpeercert()
2774 s.do_handshake()
2775 cert = s.getpeercert()
2776 self.assertTrue(cert, "Can't get peer certificate.")
2777 cipher = s.cipher()
2778 if support.verbose:
2779 sys.stdout.write(pprint.pformat(cert) + '\n')
2780 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2781 if 'subject' not in cert:
2782 self.fail("No subject field in certificate: %s." %
2783 pprint.pformat(cert))
2784 if ((('organizationName', 'Python Software Foundation'),)
2785 not in cert['subject']):
2786 self.fail(
2787 "Missing or invalid 'organizationName' field in certificate subject; "
2788 "should be 'Python Software Foundation'.")
2789 self.assertIn('notBefore', cert)
2790 self.assertIn('notAfter', cert)
2791 before = ssl.cert_time_to_seconds(cert['notBefore'])
2792 after = ssl.cert_time_to_seconds(cert['notAfter'])
2793 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002794
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002795 @unittest.skipUnless(have_verify_flags(),
2796 "verify_flags need OpenSSL > 0.9.8")
2797 def test_crl_check(self):
2798 if support.verbose:
2799 sys.stdout.write("\n")
2800
Christian Heimesa170fa12017-09-15 20:27:30 +02002801 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002802
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002803 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002804 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002805
2806 # VERIFY_DEFAULT should pass
2807 server = ThreadedEchoServer(context=server_context, chatty=True)
2808 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002809 with client_context.wrap_socket(socket.socket(),
2810 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002811 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002812 cert = s.getpeercert()
2813 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002814
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002815 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002816 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002817
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002818 server = ThreadedEchoServer(context=server_context, chatty=True)
2819 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002820 with client_context.wrap_socket(socket.socket(),
2821 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002822 with self.assertRaisesRegex(ssl.SSLError,
2823 "certificate verify failed"):
2824 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002825
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002826 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002827 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002828
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002829 server = ThreadedEchoServer(context=server_context, chatty=True)
2830 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002831 with client_context.wrap_socket(socket.socket(),
2832 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002833 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002834 cert = s.getpeercert()
2835 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002836
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002837 def test_check_hostname(self):
2838 if support.verbose:
2839 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002840
Christian Heimesa170fa12017-09-15 20:27:30 +02002841 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002842
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002843 # correct hostname should verify
2844 server = ThreadedEchoServer(context=server_context, chatty=True)
2845 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002846 with client_context.wrap_socket(socket.socket(),
2847 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002848 s.connect((HOST, server.port))
2849 cert = s.getpeercert()
2850 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002851
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002852 # incorrect hostname should raise an exception
2853 server = ThreadedEchoServer(context=server_context, chatty=True)
2854 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002855 with client_context.wrap_socket(socket.socket(),
2856 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002857 with self.assertRaisesRegex(
2858 ssl.CertificateError,
2859 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002860 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002861
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002862 # missing server_hostname arg should cause an exception, too
2863 server = ThreadedEchoServer(context=server_context, chatty=True)
2864 with server:
2865 with socket.socket() as s:
2866 with self.assertRaisesRegex(ValueError,
2867 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002868 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002869
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002870 def test_ecc_cert(self):
2871 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2872 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002873 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002874 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2875
2876 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2877 # load ECC cert
2878 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2879
2880 # correct hostname should verify
2881 server = ThreadedEchoServer(context=server_context, chatty=True)
2882 with server:
2883 with client_context.wrap_socket(socket.socket(),
2884 server_hostname=hostname) as s:
2885 s.connect((HOST, server.port))
2886 cert = s.getpeercert()
2887 self.assertTrue(cert, "Can't get peer certificate.")
2888 cipher = s.cipher()[0].split('-')
2889 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2890
2891 def test_dual_rsa_ecc(self):
2892 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2893 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002894 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2895 # algorithms.
2896 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002897 # only ECDSA certs
2898 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2899 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2900
2901 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2902 # load ECC and RSA key/cert pairs
2903 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2904 server_context.load_cert_chain(SIGNED_CERTFILE)
2905
2906 # correct hostname should verify
2907 server = ThreadedEchoServer(context=server_context, chatty=True)
2908 with server:
2909 with client_context.wrap_socket(socket.socket(),
2910 server_hostname=hostname) as s:
2911 s.connect((HOST, server.port))
2912 cert = s.getpeercert()
2913 self.assertTrue(cert, "Can't get peer certificate.")
2914 cipher = s.cipher()[0].split('-')
2915 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2916
Christian Heimes66e57422018-01-29 14:25:13 +01002917 def test_check_hostname_idn(self):
2918 if support.verbose:
2919 sys.stdout.write("\n")
2920
Christian Heimes11a14932018-02-24 02:35:08 +01002921 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002922 server_context.load_cert_chain(IDNSANSFILE)
2923
Christian Heimes11a14932018-02-24 02:35:08 +01002924 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002925 context.verify_mode = ssl.CERT_REQUIRED
2926 context.check_hostname = True
2927 context.load_verify_locations(SIGNING_CA)
2928
2929 # correct hostname should verify, when specified in several
2930 # different ways
2931 idn_hostnames = [
2932 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002933 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002934 ('xn--knig-5qa.idn.pythontest.net',
2935 'xn--knig-5qa.idn.pythontest.net'),
2936 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002937 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002938
2939 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002940 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002941 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2942 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2943 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002944 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2945
2946 # ('königsgäßchen.idna2008.pythontest.net',
2947 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2948 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2949 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2950 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2951 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2952
Christian Heimes66e57422018-01-29 14:25:13 +01002953 ]
2954 for server_hostname, expected_hostname in idn_hostnames:
2955 server = ThreadedEchoServer(context=server_context, chatty=True)
2956 with server:
2957 with context.wrap_socket(socket.socket(),
2958 server_hostname=server_hostname) as s:
2959 self.assertEqual(s.server_hostname, expected_hostname)
2960 s.connect((HOST, server.port))
2961 cert = s.getpeercert()
2962 self.assertEqual(s.server_hostname, expected_hostname)
2963 self.assertTrue(cert, "Can't get peer certificate.")
2964
Christian Heimes66e57422018-01-29 14:25:13 +01002965 # incorrect hostname should raise an exception
2966 server = ThreadedEchoServer(context=server_context, chatty=True)
2967 with server:
2968 with context.wrap_socket(socket.socket(),
2969 server_hostname="python.example.org") as s:
2970 with self.assertRaises(ssl.CertificateError):
2971 s.connect((HOST, server.port))
2972
Christian Heimes529525f2018-05-23 22:24:45 +02002973 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002974 """Connecting when the server rejects the client's certificate
2975
2976 Launch a server with CERT_REQUIRED, and check that trying to
2977 connect to it with a wrong client certificate fails.
2978 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002979 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002980 # load client cert that is not signed by trusted CA
2981 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002982 # require TLS client authentication
2983 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002984 # TLS 1.3 has different handshake
2985 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002986
2987 server = ThreadedEchoServer(
2988 context=server_context, chatty=True, connectionchatty=True,
2989 )
2990
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002991 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002992 client_context.wrap_socket(socket.socket(),
2993 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002994 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002995 # Expect either an SSL error about the server rejecting
2996 # the connection, or a low-level connection reset (which
2997 # sometimes happens on Windows)
2998 s.connect((HOST, server.port))
2999 except ssl.SSLError as e:
3000 if support.verbose:
3001 sys.stdout.write("\nSSLError is %r\n" % e)
3002 except OSError as e:
3003 if e.errno != errno.ECONNRESET:
3004 raise
3005 if support.verbose:
3006 sys.stdout.write("\nsocket.error is %r\n" % e)
3007 else:
3008 self.fail("Use of invalid cert should have failed!")
3009
Christian Heimes529525f2018-05-23 22:24:45 +02003010 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
3011 def test_wrong_cert_tls13(self):
3012 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003013 # load client cert that is not signed by trusted CA
3014 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003015 server_context.verify_mode = ssl.CERT_REQUIRED
3016 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3017 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3018
3019 server = ThreadedEchoServer(
3020 context=server_context, chatty=True, connectionchatty=True,
3021 )
3022 with server, \
3023 client_context.wrap_socket(socket.socket(),
3024 server_hostname=hostname) as s:
3025 # TLS 1.3 perform client cert exchange after handshake
3026 s.connect((HOST, server.port))
3027 try:
3028 s.write(b'data')
3029 s.read(4)
3030 except ssl.SSLError as e:
3031 if support.verbose:
3032 sys.stdout.write("\nSSLError is %r\n" % e)
3033 except OSError as e:
3034 if e.errno != errno.ECONNRESET:
3035 raise
3036 if support.verbose:
3037 sys.stdout.write("\nsocket.error is %r\n" % e)
3038 else:
3039 self.fail("Use of invalid cert should have failed!")
3040
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003041 def test_rude_shutdown(self):
3042 """A brutal shutdown of an SSL server should raise an OSError
3043 in the client when attempting handshake.
3044 """
3045 listener_ready = threading.Event()
3046 listener_gone = threading.Event()
3047
3048 s = socket.socket()
3049 port = support.bind_port(s, HOST)
3050
3051 # `listener` runs in a thread. It sits in an accept() until
3052 # the main thread connects. Then it rudely closes the socket,
3053 # and sets Event `listener_gone` to let the main thread know
3054 # the socket is gone.
3055 def listener():
3056 s.listen()
3057 listener_ready.set()
3058 newsock, addr = s.accept()
3059 newsock.close()
3060 s.close()
3061 listener_gone.set()
3062
3063 def connector():
3064 listener_ready.wait()
3065 with socket.socket() as c:
3066 c.connect((HOST, port))
3067 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003068 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003069 ssl_sock = test_wrap_socket(c)
3070 except OSError:
3071 pass
3072 else:
3073 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003074
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003075 t = threading.Thread(target=listener)
3076 t.start()
3077 try:
3078 connector()
3079 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003080 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003081
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003082 def test_ssl_cert_verify_error(self):
3083 if support.verbose:
3084 sys.stdout.write("\n")
3085
3086 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3087 server_context.load_cert_chain(SIGNED_CERTFILE)
3088
3089 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3090
3091 server = ThreadedEchoServer(context=server_context, chatty=True)
3092 with server:
3093 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003094 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003095 try:
3096 s.connect((HOST, server.port))
3097 except ssl.SSLError as e:
3098 msg = 'unable to get local issuer certificate'
3099 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3100 self.assertEqual(e.verify_code, 20)
3101 self.assertEqual(e.verify_message, msg)
3102 self.assertIn(msg, repr(e))
3103 self.assertIn('certificate verify failed', repr(e))
3104
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003105 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3106 "OpenSSL is compiled without SSLv2 support")
3107 def test_protocol_sslv2(self):
3108 """Connecting to an SSLv2 server with various client options"""
3109 if support.verbose:
3110 sys.stdout.write("\n")
3111 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3112 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3113 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003114 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003115 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3116 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3117 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3118 # SSLv23 client with specific SSL options
3119 if no_sslv2_implies_sslv3_hello():
3120 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003121 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003122 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003123 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003124 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003125 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003126 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003127
Christian Heimesa170fa12017-09-15 20:27:30 +02003128 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003129 """Connecting to an SSLv23 server with various client options"""
3130 if support.verbose:
3131 sys.stdout.write("\n")
3132 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003133 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003134 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003135 except OSError as x:
3136 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3137 if support.verbose:
3138 sys.stdout.write(
3139 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3140 % str(x))
3141 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003142 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3143 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3144 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003145
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003146 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003147 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3148 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3149 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003150
3151 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003152 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3153 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3154 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003155
3156 # Server with specific SSL options
3157 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003158 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003159 server_options=ssl.OP_NO_SSLv3)
3160 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003161 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003162 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003163 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003164 server_options=ssl.OP_NO_TLSv1)
3165
3166
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003167 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3168 "OpenSSL is compiled without SSLv3 support")
3169 def test_protocol_sslv3(self):
3170 """Connecting to an SSLv3 server with various client options"""
3171 if support.verbose:
3172 sys.stdout.write("\n")
3173 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3174 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3175 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3176 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3177 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003178 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003179 client_options=ssl.OP_NO_SSLv3)
3180 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3181 if no_sslv2_implies_sslv3_hello():
3182 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003183 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003184 False, client_options=ssl.OP_NO_SSLv2)
3185
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003186 def test_protocol_tlsv1(self):
3187 """Connecting to a TLSv1 server with various client options"""
3188 if support.verbose:
3189 sys.stdout.write("\n")
3190 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3191 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3192 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3193 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3194 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3195 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3196 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003197 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003198 client_options=ssl.OP_NO_TLSv1)
3199
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003200 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3201 "TLS version 1.1 not supported.")
3202 def test_protocol_tlsv1_1(self):
3203 """Connecting to a TLSv1.1 server with various client options.
3204 Testing against older TLS versions."""
3205 if support.verbose:
3206 sys.stdout.write("\n")
3207 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3208 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3209 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3210 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3211 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003212 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003213 client_options=ssl.OP_NO_TLSv1_1)
3214
Christian Heimesa170fa12017-09-15 20:27:30 +02003215 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003216 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3217 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3218
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3220 "TLS version 1.2 not supported.")
3221 def test_protocol_tlsv1_2(self):
3222 """Connecting to a TLSv1.2 server with various client options.
3223 Testing against older TLS versions."""
3224 if support.verbose:
3225 sys.stdout.write("\n")
3226 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3227 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3228 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3229 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3230 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3231 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3232 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003233 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003234 client_options=ssl.OP_NO_TLSv1_2)
3235
Christian Heimesa170fa12017-09-15 20:27:30 +02003236 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003237 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3238 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3239 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3240 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3241
3242 def test_starttls(self):
3243 """Switching from clear text to encrypted and back again."""
3244 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3245
3246 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003247 starttls_server=True,
3248 chatty=True,
3249 connectionchatty=True)
3250 wrapped = False
3251 with server:
3252 s = socket.socket()
3253 s.setblocking(1)
3254 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003255 if support.verbose:
3256 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003257 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003258 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003259 sys.stdout.write(
3260 " client: sending %r...\n" % indata)
3261 if wrapped:
3262 conn.write(indata)
3263 outdata = conn.read()
3264 else:
3265 s.send(indata)
3266 outdata = s.recv(1024)
3267 msg = outdata.strip().lower()
3268 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3269 # STARTTLS ok, switch to secure mode
3270 if support.verbose:
3271 sys.stdout.write(
3272 " client: read %r from server, starting TLS...\n"
3273 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003274 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003275 wrapped = True
3276 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3277 # ENDTLS ok, switch back to clear text
3278 if support.verbose:
3279 sys.stdout.write(
3280 " client: read %r from server, ending TLS...\n"
3281 % msg)
3282 s = conn.unwrap()
3283 wrapped = False
3284 else:
3285 if support.verbose:
3286 sys.stdout.write(
3287 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003288 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003289 sys.stdout.write(" client: closing connection.\n")
3290 if wrapped:
3291 conn.write(b"over\n")
3292 else:
3293 s.send(b"over\n")
3294 if wrapped:
3295 conn.close()
3296 else:
3297 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003298
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003299 def test_socketserver(self):
3300 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003301 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003302 # try to connect
3303 if support.verbose:
3304 sys.stdout.write('\n')
3305 with open(CERTFILE, 'rb') as f:
3306 d1 = f.read()
3307 d2 = ''
3308 # now fetch the same data from the HTTPS server
3309 url = 'https://localhost:%d/%s' % (
3310 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003311 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003312 f = urllib.request.urlopen(url, context=context)
3313 try:
3314 dlen = f.info().get("content-length")
3315 if dlen and (int(dlen) > 0):
3316 d2 = f.read(int(dlen))
3317 if support.verbose:
3318 sys.stdout.write(
3319 " client: read %d bytes from remote server '%s'\n"
3320 % (len(d2), server))
3321 finally:
3322 f.close()
3323 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003324
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003325 def test_asyncore_server(self):
3326 """Check the example asyncore integration."""
3327 if support.verbose:
3328 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003329
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003330 indata = b"FOO\n"
3331 server = AsyncoreEchoServer(CERTFILE)
3332 with server:
3333 s = test_wrap_socket(socket.socket())
3334 s.connect(('127.0.0.1', server.port))
3335 if support.verbose:
3336 sys.stdout.write(
3337 " client: sending %r...\n" % indata)
3338 s.write(indata)
3339 outdata = s.read()
3340 if support.verbose:
3341 sys.stdout.write(" client: read %r\n" % outdata)
3342 if outdata != indata.lower():
3343 self.fail(
3344 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3345 % (outdata[:20], len(outdata),
3346 indata[:20].lower(), len(indata)))
3347 s.write(b"over\n")
3348 if support.verbose:
3349 sys.stdout.write(" client: closing connection.\n")
3350 s.close()
3351 if support.verbose:
3352 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003353
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003354 def test_recv_send(self):
3355 """Test recv(), send() and friends."""
3356 if support.verbose:
3357 sys.stdout.write("\n")
3358
3359 server = ThreadedEchoServer(CERTFILE,
3360 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003361 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003362 cacerts=CERTFILE,
3363 chatty=True,
3364 connectionchatty=False)
3365 with server:
3366 s = test_wrap_socket(socket.socket(),
3367 server_side=False,
3368 certfile=CERTFILE,
3369 ca_certs=CERTFILE,
3370 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003371 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003372 s.connect((HOST, server.port))
3373 # helper methods for standardising recv* method signatures
3374 def _recv_into():
3375 b = bytearray(b"\0"*100)
3376 count = s.recv_into(b)
3377 return b[:count]
3378
3379 def _recvfrom_into():
3380 b = bytearray(b"\0"*100)
3381 count, addr = s.recvfrom_into(b)
3382 return b[:count]
3383
3384 # (name, method, expect success?, *args, return value func)
3385 send_methods = [
3386 ('send', s.send, True, [], len),
3387 ('sendto', s.sendto, False, ["some.address"], len),
3388 ('sendall', s.sendall, True, [], lambda x: None),
3389 ]
3390 # (name, method, whether to expect success, *args)
3391 recv_methods = [
3392 ('recv', s.recv, True, []),
3393 ('recvfrom', s.recvfrom, False, ["some.address"]),
3394 ('recv_into', _recv_into, True, []),
3395 ('recvfrom_into', _recvfrom_into, False, []),
3396 ]
3397 data_prefix = "PREFIX_"
3398
3399 for (meth_name, send_meth, expect_success, args,
3400 ret_val_meth) in send_methods:
3401 indata = (data_prefix + meth_name).encode('ascii')
3402 try:
3403 ret = send_meth(indata, *args)
3404 msg = "sending with {}".format(meth_name)
3405 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3406 outdata = s.read()
3407 if outdata != indata.lower():
3408 self.fail(
3409 "While sending with <<{name:s}>> bad data "
3410 "<<{outdata:r}>> ({nout:d}) received; "
3411 "expected <<{indata:r}>> ({nin:d})\n".format(
3412 name=meth_name, outdata=outdata[:20],
3413 nout=len(outdata),
3414 indata=indata[:20], nin=len(indata)
3415 )
3416 )
3417 except ValueError as e:
3418 if expect_success:
3419 self.fail(
3420 "Failed to send with method <<{name:s}>>; "
3421 "expected to succeed.\n".format(name=meth_name)
3422 )
3423 if not str(e).startswith(meth_name):
3424 self.fail(
3425 "Method <<{name:s}>> failed with unexpected "
3426 "exception message: {exp:s}\n".format(
3427 name=meth_name, exp=e
3428 )
3429 )
3430
3431 for meth_name, recv_meth, expect_success, args in recv_methods:
3432 indata = (data_prefix + meth_name).encode('ascii')
3433 try:
3434 s.send(indata)
3435 outdata = recv_meth(*args)
3436 if outdata != indata.lower():
3437 self.fail(
3438 "While receiving with <<{name:s}>> bad data "
3439 "<<{outdata:r}>> ({nout:d}) received; "
3440 "expected <<{indata:r}>> ({nin:d})\n".format(
3441 name=meth_name, outdata=outdata[:20],
3442 nout=len(outdata),
3443 indata=indata[:20], nin=len(indata)
3444 )
3445 )
3446 except ValueError as e:
3447 if expect_success:
3448 self.fail(
3449 "Failed to receive with method <<{name:s}>>; "
3450 "expected to succeed.\n".format(name=meth_name)
3451 )
3452 if not str(e).startswith(meth_name):
3453 self.fail(
3454 "Method <<{name:s}>> failed with unexpected "
3455 "exception message: {exp:s}\n".format(
3456 name=meth_name, exp=e
3457 )
3458 )
3459 # consume data
3460 s.read()
3461
3462 # read(-1, buffer) is supported, even though read(-1) is not
3463 data = b"data"
3464 s.send(data)
3465 buffer = bytearray(len(data))
3466 self.assertEqual(s.read(-1, buffer), len(data))
3467 self.assertEqual(buffer, data)
3468
Christian Heimes888bbdc2017-09-07 14:18:21 -07003469 # sendall accepts bytes-like objects
3470 if ctypes is not None:
3471 ubyte = ctypes.c_ubyte * len(data)
3472 byteslike = ubyte.from_buffer_copy(data)
3473 s.sendall(byteslike)
3474 self.assertEqual(s.read(), data)
3475
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003476 # Make sure sendmsg et al are disallowed to avoid
3477 # inadvertent disclosure of data and/or corruption
3478 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003479 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003480 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3481 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3482 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003483 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003484 s.write(b"over\n")
3485
3486 self.assertRaises(ValueError, s.recv, -1)
3487 self.assertRaises(ValueError, s.read, -1)
3488
3489 s.close()
3490
3491 def test_recv_zero(self):
3492 server = ThreadedEchoServer(CERTFILE)
3493 server.__enter__()
3494 self.addCleanup(server.__exit__, None, None)
3495 s = socket.create_connection((HOST, server.port))
3496 self.addCleanup(s.close)
3497 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3498 self.addCleanup(s.close)
3499
3500 # recv/read(0) should return no data
3501 s.send(b"data")
3502 self.assertEqual(s.recv(0), b"")
3503 self.assertEqual(s.read(0), b"")
3504 self.assertEqual(s.read(), b"data")
3505
3506 # Should not block if the other end sends no data
3507 s.setblocking(False)
3508 self.assertEqual(s.recv(0), b"")
3509 self.assertEqual(s.recv_into(bytearray()), 0)
3510
3511 def test_nonblocking_send(self):
3512 server = ThreadedEchoServer(CERTFILE,
3513 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003514 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003515 cacerts=CERTFILE,
3516 chatty=True,
3517 connectionchatty=False)
3518 with server:
3519 s = test_wrap_socket(socket.socket(),
3520 server_side=False,
3521 certfile=CERTFILE,
3522 ca_certs=CERTFILE,
3523 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003524 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003525 s.connect((HOST, server.port))
3526 s.setblocking(False)
3527
3528 # If we keep sending data, at some point the buffers
3529 # will be full and the call will block
3530 buf = bytearray(8192)
3531 def fill_buffer():
3532 while True:
3533 s.send(buf)
3534 self.assertRaises((ssl.SSLWantWriteError,
3535 ssl.SSLWantReadError), fill_buffer)
3536
3537 # Now read all the output and discard it
3538 s.setblocking(True)
3539 s.close()
3540
3541 def test_handshake_timeout(self):
3542 # Issue #5103: SSL handshake must respect the socket timeout
3543 server = socket.socket(socket.AF_INET)
3544 host = "127.0.0.1"
3545 port = support.bind_port(server)
3546 started = threading.Event()
3547 finish = False
3548
3549 def serve():
3550 server.listen()
3551 started.set()
3552 conns = []
3553 while not finish:
3554 r, w, e = select.select([server], [], [], 0.1)
3555 if server in r:
3556 # Let the socket hang around rather than having
3557 # it closed by garbage collection.
3558 conns.append(server.accept()[0])
3559 for sock in conns:
3560 sock.close()
3561
3562 t = threading.Thread(target=serve)
3563 t.start()
3564 started.wait()
3565
3566 try:
3567 try:
3568 c = socket.socket(socket.AF_INET)
3569 c.settimeout(0.2)
3570 c.connect((host, port))
3571 # Will attempt handshake and time out
3572 self.assertRaisesRegex(socket.timeout, "timed out",
3573 test_wrap_socket, c)
3574 finally:
3575 c.close()
3576 try:
3577 c = socket.socket(socket.AF_INET)
3578 c = test_wrap_socket(c)
3579 c.settimeout(0.2)
3580 # Will attempt handshake and time out
3581 self.assertRaisesRegex(socket.timeout, "timed out",
3582 c.connect, (host, port))
3583 finally:
3584 c.close()
3585 finally:
3586 finish = True
3587 t.join()
3588 server.close()
3589
3590 def test_server_accept(self):
3591 # Issue #16357: accept() on a SSLSocket created through
3592 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003593 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003594 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003595 context.load_verify_locations(SIGNING_CA)
3596 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003597 server = socket.socket(socket.AF_INET)
3598 host = "127.0.0.1"
3599 port = support.bind_port(server)
3600 server = context.wrap_socket(server, server_side=True)
3601 self.assertTrue(server.server_side)
3602
3603 evt = threading.Event()
3604 remote = None
3605 peer = None
3606 def serve():
3607 nonlocal remote, peer
3608 server.listen()
3609 # Block on the accept and wait on the connection to close.
3610 evt.set()
3611 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003612 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003613
3614 t = threading.Thread(target=serve)
3615 t.start()
3616 # Client wait until server setup and perform a connect.
3617 evt.wait()
3618 client = context.wrap_socket(socket.socket())
3619 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003620 client.send(b'data')
3621 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003622 client_addr = client.getsockname()
3623 client.close()
3624 t.join()
3625 remote.close()
3626 server.close()
3627 # Sanity checks.
3628 self.assertIsInstance(remote, ssl.SSLSocket)
3629 self.assertEqual(peer, client_addr)
3630
3631 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003632 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003633 with context.wrap_socket(socket.socket()) as sock:
3634 with self.assertRaises(OSError) as cm:
3635 sock.getpeercert()
3636 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3637
3638 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003639 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003640 with context.wrap_socket(socket.socket()) as sock:
3641 with self.assertRaises(OSError) as cm:
3642 sock.do_handshake()
3643 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3644
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003645 def test_no_shared_ciphers(self):
3646 client_context, server_context, hostname = testing_context()
3647 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3648 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003649 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003650 client_context.set_ciphers("AES128")
3651 server_context.set_ciphers("AES256")
3652 with ThreadedEchoServer(context=server_context) as server:
3653 with client_context.wrap_socket(socket.socket(),
3654 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003655 with self.assertRaises(OSError):
3656 s.connect((HOST, server.port))
3657 self.assertIn("no shared cipher", server.conn_errors[0])
3658
3659 def test_version_basic(self):
3660 """
3661 Basic tests for SSLSocket.version().
3662 More tests are done in the test_protocol_*() methods.
3663 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003664 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3665 context.check_hostname = False
3666 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003667 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003668 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003669 chatty=False) as server:
3670 with context.wrap_socket(socket.socket()) as s:
3671 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003672 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003673 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003674 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003675 self.assertEqual(s.version(), 'TLSv1.3')
3676 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003677 self.assertEqual(s.version(), 'TLSv1.2')
3678 else: # 0.9.8 to 1.0.1
3679 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003680 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003681 self.assertIs(s.version(), None)
3682
Christian Heimescb5b68a2017-09-07 18:07:00 -07003683 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3684 "test requires TLSv1.3 enabled OpenSSL")
3685 def test_tls1_3(self):
3686 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3687 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003688 context.options |= (
3689 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3690 )
3691 with ThreadedEchoServer(context=context) as server:
3692 with context.wrap_socket(socket.socket()) as s:
3693 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003694 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003695 'TLS_AES_256_GCM_SHA384',
3696 'TLS_CHACHA20_POLY1305_SHA256',
3697 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003698 })
3699 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003700
Christian Heimes698dde12018-02-27 11:54:43 +01003701 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3702 "required OpenSSL 1.1.0g")
3703 def test_min_max_version(self):
3704 client_context, server_context, hostname = testing_context()
3705 # client TLSv1.0 to 1.2
3706 client_context.minimum_version = ssl.TLSVersion.TLSv1
3707 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3708 # server only TLSv1.2
3709 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3710 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3711
3712 with ThreadedEchoServer(context=server_context) as server:
3713 with client_context.wrap_socket(socket.socket(),
3714 server_hostname=hostname) as s:
3715 s.connect((HOST, server.port))
3716 self.assertEqual(s.version(), 'TLSv1.2')
3717
3718 # client 1.0 to 1.2, server 1.0 to 1.1
3719 server_context.minimum_version = ssl.TLSVersion.TLSv1
3720 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3721
3722 with ThreadedEchoServer(context=server_context) as server:
3723 with client_context.wrap_socket(socket.socket(),
3724 server_hostname=hostname) as s:
3725 s.connect((HOST, server.port))
3726 self.assertEqual(s.version(), 'TLSv1.1')
3727
3728 # client 1.0, server 1.2 (mismatch)
3729 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3730 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003731 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003732 client_context.maximum_version = ssl.TLSVersion.TLSv1
3733 with ThreadedEchoServer(context=server_context) as server:
3734 with client_context.wrap_socket(socket.socket(),
3735 server_hostname=hostname) as s:
3736 with self.assertRaises(ssl.SSLError) as e:
3737 s.connect((HOST, server.port))
3738 self.assertIn("alert", str(e.exception))
3739
3740
3741 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3742 "required OpenSSL 1.1.0g")
3743 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3744 def test_min_max_version_sslv3(self):
3745 client_context, server_context, hostname = testing_context()
3746 server_context.minimum_version = ssl.TLSVersion.SSLv3
3747 client_context.minimum_version = ssl.TLSVersion.SSLv3
3748 client_context.maximum_version = ssl.TLSVersion.SSLv3
3749 with ThreadedEchoServer(context=server_context) as server:
3750 with client_context.wrap_socket(socket.socket(),
3751 server_hostname=hostname) as s:
3752 s.connect((HOST, server.port))
3753 self.assertEqual(s.version(), 'SSLv3')
3754
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003755 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3756 def test_default_ecdh_curve(self):
3757 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3758 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003759 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003760 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003761 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3762 # cipher name.
3763 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003764 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3765 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3766 # our default cipher list should prefer ECDH-based ciphers
3767 # automatically.
3768 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3769 context.set_ciphers("ECCdraft:ECDH")
3770 with ThreadedEchoServer(context=context) as server:
3771 with context.wrap_socket(socket.socket()) as s:
3772 s.connect((HOST, server.port))
3773 self.assertIn("ECDH", s.cipher()[0])
3774
3775 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3776 "'tls-unique' channel binding not available")
3777 def test_tls_unique_channel_binding(self):
3778 """Test tls-unique channel binding."""
3779 if support.verbose:
3780 sys.stdout.write("\n")
3781
Christian Heimes05d9fe32018-02-27 08:55:39 +01003782 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003783
3784 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003785 chatty=True,
3786 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003787
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003788 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003789 with client_context.wrap_socket(
3790 socket.socket(),
3791 server_hostname=hostname) as s:
3792 s.connect((HOST, server.port))
3793 # get the data
3794 cb_data = s.get_channel_binding("tls-unique")
3795 if support.verbose:
3796 sys.stdout.write(
3797 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003798
Christian Heimes05d9fe32018-02-27 08:55:39 +01003799 # check if it is sane
3800 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003801 if s.version() == 'TLSv1.3':
3802 self.assertEqual(len(cb_data), 48)
3803 else:
3804 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003805
Christian Heimes05d9fe32018-02-27 08:55:39 +01003806 # and compare with the peers version
3807 s.write(b"CB tls-unique\n")
3808 peer_data_repr = s.read().strip()
3809 self.assertEqual(peer_data_repr,
3810 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003811
3812 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003813 with client_context.wrap_socket(
3814 socket.socket(),
3815 server_hostname=hostname) as s:
3816 s.connect((HOST, server.port))
3817 new_cb_data = s.get_channel_binding("tls-unique")
3818 if support.verbose:
3819 sys.stdout.write(
3820 "got another channel binding data: {0!r}\n".format(
3821 new_cb_data)
3822 )
3823 # is it really unique
3824 self.assertNotEqual(cb_data, new_cb_data)
3825 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003826 if s.version() == 'TLSv1.3':
3827 self.assertEqual(len(cb_data), 48)
3828 else:
3829 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003830 s.write(b"CB tls-unique\n")
3831 peer_data_repr = s.read().strip()
3832 self.assertEqual(peer_data_repr,
3833 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003834
3835 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003836 client_context, server_context, hostname = testing_context()
3837 stats = server_params_test(client_context, server_context,
3838 chatty=True, connectionchatty=True,
3839 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003840 if support.verbose:
3841 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3842 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3843
3844 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3845 "ssl.OP_NO_COMPRESSION needed for this test")
3846 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003847 client_context, server_context, hostname = testing_context()
3848 client_context.options |= ssl.OP_NO_COMPRESSION
3849 server_context.options |= ssl.OP_NO_COMPRESSION
3850 stats = server_params_test(client_context, server_context,
3851 chatty=True, connectionchatty=True,
3852 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003853 self.assertIs(stats['compression'], None)
3854
3855 def test_dh_params(self):
3856 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003857 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003858 # test scenario needs TLS <= 1.2
3859 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003860 server_context.load_dh_params(DHFILE)
3861 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003862 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003863 stats = server_params_test(client_context, server_context,
3864 chatty=True, connectionchatty=True,
3865 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003866 cipher = stats["cipher"][0]
3867 parts = cipher.split("-")
3868 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3869 self.fail("Non-DH cipher: " + cipher[0])
3870
Christian Heimesb7b92252018-02-25 09:49:31 +01003871 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003872 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003873 def test_ecdh_curve(self):
3874 # server secp384r1, client auto
3875 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003876
Christian Heimesb7b92252018-02-25 09:49:31 +01003877 server_context.set_ecdh_curve("secp384r1")
3878 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3879 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3880 stats = server_params_test(client_context, server_context,
3881 chatty=True, connectionchatty=True,
3882 sni_name=hostname)
3883
3884 # server auto, client secp384r1
3885 client_context, server_context, hostname = testing_context()
3886 client_context.set_ecdh_curve("secp384r1")
3887 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3888 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3889 stats = server_params_test(client_context, server_context,
3890 chatty=True, connectionchatty=True,
3891 sni_name=hostname)
3892
3893 # server / client curve mismatch
3894 client_context, server_context, hostname = testing_context()
3895 client_context.set_ecdh_curve("prime256v1")
3896 server_context.set_ecdh_curve("secp384r1")
3897 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3898 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3899 try:
3900 stats = server_params_test(client_context, server_context,
3901 chatty=True, connectionchatty=True,
3902 sni_name=hostname)
3903 except ssl.SSLError:
3904 pass
3905 else:
3906 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003907 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003908 self.fail("mismatch curve did not fail")
3909
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003910 def test_selected_alpn_protocol(self):
3911 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003912 client_context, server_context, hostname = testing_context()
3913 stats = server_params_test(client_context, server_context,
3914 chatty=True, connectionchatty=True,
3915 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003916 self.assertIs(stats['client_alpn_protocol'], None)
3917
3918 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3919 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3920 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003921 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003922 server_context.set_alpn_protocols(['foo', 'bar'])
3923 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003924 chatty=True, connectionchatty=True,
3925 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003926 self.assertIs(stats['client_alpn_protocol'], None)
3927
3928 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3929 def test_alpn_protocols(self):
3930 server_protocols = ['foo', 'bar', 'milkshake']
3931 protocol_tests = [
3932 (['foo', 'bar'], 'foo'),
3933 (['bar', 'foo'], 'foo'),
3934 (['milkshake'], 'milkshake'),
3935 (['http/3.0', 'http/4.0'], None)
3936 ]
3937 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003938 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003940 client_context.set_alpn_protocols(client_protocols)
3941
3942 try:
3943 stats = server_params_test(client_context,
3944 server_context,
3945 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003946 connectionchatty=True,
3947 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003948 except ssl.SSLError as e:
3949 stats = e
3950
Christian Heimes05d9fe32018-02-27 08:55:39 +01003951 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003952 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3953 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3954 self.assertIsInstance(stats, ssl.SSLError)
3955 else:
3956 msg = "failed trying %s (s) and %s (c).\n" \
3957 "was expecting %s, but got %%s from the %%s" \
3958 % (str(server_protocols), str(client_protocols),
3959 str(expected))
3960 client_result = stats['client_alpn_protocol']
3961 self.assertEqual(client_result, expected,
3962 msg % (client_result, "client"))
3963 server_result = stats['server_alpn_protocols'][-1] \
3964 if len(stats['server_alpn_protocols']) else 'nothing'
3965 self.assertEqual(server_result, expected,
3966 msg % (server_result, "server"))
3967
3968 def test_selected_npn_protocol(self):
3969 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003970 client_context, server_context, hostname = testing_context()
3971 stats = server_params_test(client_context, server_context,
3972 chatty=True, connectionchatty=True,
3973 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003974 self.assertIs(stats['client_npn_protocol'], None)
3975
3976 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3977 def test_npn_protocols(self):
3978 server_protocols = ['http/1.1', 'spdy/2']
3979 protocol_tests = [
3980 (['http/1.1', 'spdy/2'], 'http/1.1'),
3981 (['spdy/2', 'http/1.1'], 'http/1.1'),
3982 (['spdy/2', 'test'], 'spdy/2'),
3983 (['abc', 'def'], 'abc')
3984 ]
3985 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003986 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003987 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003988 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003989 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003990 chatty=True, connectionchatty=True,
3991 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003992 msg = "failed trying %s (s) and %s (c).\n" \
3993 "was expecting %s, but got %%s from the %%s" \
3994 % (str(server_protocols), str(client_protocols),
3995 str(expected))
3996 client_result = stats['client_npn_protocol']
3997 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3998 server_result = stats['server_npn_protocols'][-1] \
3999 if len(stats['server_npn_protocols']) else 'nothing'
4000 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4001
4002 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004003 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004004 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004005 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004006 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004007 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004008 client_context.load_verify_locations(SIGNING_CA)
4009 return server_context, other_context, client_context
4010
4011 def check_common_name(self, stats, name):
4012 cert = stats['peercert']
4013 self.assertIn((('commonName', name),), cert['subject'])
4014
4015 @needs_sni
4016 def test_sni_callback(self):
4017 calls = []
4018 server_context, other_context, client_context = self.sni_contexts()
4019
Christian Heimesa170fa12017-09-15 20:27:30 +02004020 client_context.check_hostname = False
4021
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 def servername_cb(ssl_sock, server_name, initial_context):
4023 calls.append((server_name, initial_context))
4024 if server_name is not None:
4025 ssl_sock.context = other_context
4026 server_context.set_servername_callback(servername_cb)
4027
4028 stats = server_params_test(client_context, server_context,
4029 chatty=True,
4030 sni_name='supermessage')
4031 # The hostname was fetched properly, and the certificate was
4032 # changed for the connection.
4033 self.assertEqual(calls, [("supermessage", server_context)])
4034 # CERTFILE4 was selected
4035 self.check_common_name(stats, 'fakehostname')
4036
4037 calls = []
4038 # The callback is called with server_name=None
4039 stats = server_params_test(client_context, server_context,
4040 chatty=True,
4041 sni_name=None)
4042 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004043 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004044
4045 # Check disabling the callback
4046 calls = []
4047 server_context.set_servername_callback(None)
4048
4049 stats = server_params_test(client_context, server_context,
4050 chatty=True,
4051 sni_name='notfunny')
4052 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004053 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004054 self.assertEqual(calls, [])
4055
4056 @needs_sni
4057 def test_sni_callback_alert(self):
4058 # Returning a TLS alert is reflected to the connecting client
4059 server_context, other_context, client_context = self.sni_contexts()
4060
4061 def cb_returning_alert(ssl_sock, server_name, initial_context):
4062 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4063 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004064 with self.assertRaises(ssl.SSLError) as cm:
4065 stats = server_params_test(client_context, server_context,
4066 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004067 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004068 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004069
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004070 @needs_sni
4071 def test_sni_callback_raising(self):
4072 # Raising fails the connection with a TLS handshake failure alert.
4073 server_context, other_context, client_context = self.sni_contexts()
4074
4075 def cb_raising(ssl_sock, server_name, initial_context):
4076 1/0
4077 server_context.set_servername_callback(cb_raising)
4078
Victor Stinner00253502019-06-03 03:51:43 +02004079 with support.catch_unraisable_exception() as catch:
4080 with self.assertRaises(ssl.SSLError) as cm:
4081 stats = server_params_test(client_context, server_context,
4082 chatty=False,
4083 sni_name='supermessage')
4084
4085 self.assertEqual(cm.exception.reason,
4086 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4087 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004088
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004089 @needs_sni
4090 def test_sni_callback_wrong_return_type(self):
4091 # Returning the wrong return type terminates the TLS connection
4092 # with an internal error alert.
4093 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004094
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004095 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4096 return "foo"
4097 server_context.set_servername_callback(cb_wrong_return_type)
4098
Victor Stinner00253502019-06-03 03:51:43 +02004099 with support.catch_unraisable_exception() as catch:
4100 with self.assertRaises(ssl.SSLError) as cm:
4101 stats = server_params_test(client_context, server_context,
4102 chatty=False,
4103 sni_name='supermessage')
4104
4105
4106 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4107 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004108
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004109 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004110 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004111 client_context.set_ciphers("AES128:AES256")
4112 server_context.set_ciphers("AES256")
4113 expected_algs = [
4114 "AES256", "AES-256",
4115 # TLS 1.3 ciphers are always enabled
4116 "TLS_CHACHA20", "TLS_AES",
4117 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004118
Christian Heimesa170fa12017-09-15 20:27:30 +02004119 stats = server_params_test(client_context, server_context,
4120 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004121 ciphers = stats['server_shared_ciphers'][0]
4122 self.assertGreater(len(ciphers), 0)
4123 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004124 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004125 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004126
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004127 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004128 client_context, server_context, hostname = testing_context()
4129 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004130
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004132 s = client_context.wrap_socket(socket.socket(),
4133 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004134 s.connect((HOST, server.port))
4135 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004136
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004137 self.assertRaises(ValueError, s.read, 1024)
4138 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004139
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004140 def test_sendfile(self):
4141 TEST_DATA = b"x" * 512
4142 with open(support.TESTFN, 'wb') as f:
4143 f.write(TEST_DATA)
4144 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004145 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004146 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004147 context.load_verify_locations(SIGNING_CA)
4148 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004149 server = ThreadedEchoServer(context=context, chatty=False)
4150 with server:
4151 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004152 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004153 with open(support.TESTFN, 'rb') as file:
4154 s.sendfile(file)
4155 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004156
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004157 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004158 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004159 # TODO: sessions aren't compatible with TLSv1.3 yet
4160 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004161
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004162 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004163 stats = server_params_test(client_context, server_context,
4164 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004165 session = stats['session']
4166 self.assertTrue(session.id)
4167 self.assertGreater(session.time, 0)
4168 self.assertGreater(session.timeout, 0)
4169 self.assertTrue(session.has_ticket)
4170 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4171 self.assertGreater(session.ticket_lifetime_hint, 0)
4172 self.assertFalse(stats['session_reused'])
4173 sess_stat = server_context.session_stats()
4174 self.assertEqual(sess_stat['accept'], 1)
4175 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004176
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004177 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004178 stats = server_params_test(client_context, server_context,
4179 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004180 sess_stat = server_context.session_stats()
4181 self.assertEqual(sess_stat['accept'], 2)
4182 self.assertEqual(sess_stat['hits'], 1)
4183 self.assertTrue(stats['session_reused'])
4184 session2 = stats['session']
4185 self.assertEqual(session2.id, session.id)
4186 self.assertEqual(session2, session)
4187 self.assertIsNot(session2, session)
4188 self.assertGreaterEqual(session2.time, session.time)
4189 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004190
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004191 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004192 stats = server_params_test(client_context, server_context,
4193 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004194 self.assertFalse(stats['session_reused'])
4195 session3 = stats['session']
4196 self.assertNotEqual(session3.id, session.id)
4197 self.assertNotEqual(session3, session)
4198 sess_stat = server_context.session_stats()
4199 self.assertEqual(sess_stat['accept'], 3)
4200 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004201
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004202 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004203 stats = server_params_test(client_context, server_context,
4204 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004205 self.assertTrue(stats['session_reused'])
4206 session4 = stats['session']
4207 self.assertEqual(session4.id, session.id)
4208 self.assertEqual(session4, session)
4209 self.assertGreaterEqual(session4.time, session.time)
4210 self.assertGreaterEqual(session4.timeout, session.timeout)
4211 sess_stat = server_context.session_stats()
4212 self.assertEqual(sess_stat['accept'], 4)
4213 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004214
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004215 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004216 client_context, server_context, hostname = testing_context()
4217 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004218
Christian Heimes05d9fe32018-02-27 08:55:39 +01004219 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004220 client_context.options |= ssl.OP_NO_TLSv1_3
4221 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004222
Christian Heimesa170fa12017-09-15 20:27:30 +02004223 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004224 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004225 with client_context.wrap_socket(socket.socket(),
4226 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004227 # session is None before handshake
4228 self.assertEqual(s.session, None)
4229 self.assertEqual(s.session_reused, None)
4230 s.connect((HOST, server.port))
4231 session = s.session
4232 self.assertTrue(session)
4233 with self.assertRaises(TypeError) as e:
4234 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004235 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004236
Christian Heimesa170fa12017-09-15 20:27:30 +02004237 with client_context.wrap_socket(socket.socket(),
4238 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004239 s.connect((HOST, server.port))
4240 # cannot set session after handshake
4241 with self.assertRaises(ValueError) as e:
4242 s.session = session
4243 self.assertEqual(str(e.exception),
4244 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004245
Christian Heimesa170fa12017-09-15 20:27:30 +02004246 with client_context.wrap_socket(socket.socket(),
4247 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004248 # can set session before handshake and before the
4249 # connection was established
4250 s.session = session
4251 s.connect((HOST, server.port))
4252 self.assertEqual(s.session.id, session.id)
4253 self.assertEqual(s.session, session)
4254 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004255
Christian Heimesa170fa12017-09-15 20:27:30 +02004256 with client_context2.wrap_socket(socket.socket(),
4257 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004258 # cannot re-use session with a different SSLContext
4259 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004260 s.session = session
4261 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004262 self.assertEqual(str(e.exception),
4263 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004264
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004265
Christian Heimes9fb051f2018-09-23 08:32:31 +02004266@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4267class TestPostHandshakeAuth(unittest.TestCase):
4268 def test_pha_setter(self):
4269 protocols = [
4270 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4271 ]
4272 for protocol in protocols:
4273 ctx = ssl.SSLContext(protocol)
4274 self.assertEqual(ctx.post_handshake_auth, False)
4275
4276 ctx.post_handshake_auth = True
4277 self.assertEqual(ctx.post_handshake_auth, True)
4278
4279 ctx.verify_mode = ssl.CERT_REQUIRED
4280 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4281 self.assertEqual(ctx.post_handshake_auth, True)
4282
4283 ctx.post_handshake_auth = False
4284 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4285 self.assertEqual(ctx.post_handshake_auth, False)
4286
4287 ctx.verify_mode = ssl.CERT_OPTIONAL
4288 ctx.post_handshake_auth = True
4289 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4290 self.assertEqual(ctx.post_handshake_auth, True)
4291
4292 def test_pha_required(self):
4293 client_context, server_context, hostname = testing_context()
4294 server_context.post_handshake_auth = True
4295 server_context.verify_mode = ssl.CERT_REQUIRED
4296 client_context.post_handshake_auth = True
4297 client_context.load_cert_chain(SIGNED_CERTFILE)
4298
4299 server = ThreadedEchoServer(context=server_context, chatty=False)
4300 with server:
4301 with client_context.wrap_socket(socket.socket(),
4302 server_hostname=hostname) as s:
4303 s.connect((HOST, server.port))
4304 s.write(b'HASCERT')
4305 self.assertEqual(s.recv(1024), b'FALSE\n')
4306 s.write(b'PHA')
4307 self.assertEqual(s.recv(1024), b'OK\n')
4308 s.write(b'HASCERT')
4309 self.assertEqual(s.recv(1024), b'TRUE\n')
4310 # PHA method just returns true when cert is already available
4311 s.write(b'PHA')
4312 self.assertEqual(s.recv(1024), b'OK\n')
4313 s.write(b'GETCERT')
4314 cert_text = s.recv(4096).decode('us-ascii')
4315 self.assertIn('Python Software Foundation CA', cert_text)
4316
4317 def test_pha_required_nocert(self):
4318 client_context, server_context, hostname = testing_context()
4319 server_context.post_handshake_auth = True
4320 server_context.verify_mode = ssl.CERT_REQUIRED
4321 client_context.post_handshake_auth = True
4322
Miss Islington (bot)4c403b82019-07-09 05:55:08 -07004323 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4324 # (it is only raised sometimes on Windows)
4325 with support.catch_threading_exception() as cm:
4326 server = ThreadedEchoServer(context=server_context, chatty=False)
4327 with server:
4328 with client_context.wrap_socket(socket.socket(),
4329 server_hostname=hostname) as s:
4330 s.connect((HOST, server.port))
4331 s.write(b'PHA')
4332 # receive CertificateRequest
4333 self.assertEqual(s.recv(1024), b'OK\n')
4334 # send empty Certificate + Finish
4335 s.write(b'HASCERT')
4336 # receive alert
4337 with self.assertRaisesRegex(
4338 ssl.SSLError,
4339 'tlsv13 alert certificate required'):
4340 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004341
4342 def test_pha_optional(self):
4343 if support.verbose:
4344 sys.stdout.write("\n")
4345
4346 client_context, server_context, hostname = testing_context()
4347 server_context.post_handshake_auth = True
4348 server_context.verify_mode = ssl.CERT_REQUIRED
4349 client_context.post_handshake_auth = True
4350 client_context.load_cert_chain(SIGNED_CERTFILE)
4351
4352 # check CERT_OPTIONAL
4353 server_context.verify_mode = ssl.CERT_OPTIONAL
4354 server = ThreadedEchoServer(context=server_context, chatty=False)
4355 with server:
4356 with client_context.wrap_socket(socket.socket(),
4357 server_hostname=hostname) as s:
4358 s.connect((HOST, server.port))
4359 s.write(b'HASCERT')
4360 self.assertEqual(s.recv(1024), b'FALSE\n')
4361 s.write(b'PHA')
4362 self.assertEqual(s.recv(1024), b'OK\n')
4363 s.write(b'HASCERT')
4364 self.assertEqual(s.recv(1024), b'TRUE\n')
4365
4366 def test_pha_optional_nocert(self):
4367 if support.verbose:
4368 sys.stdout.write("\n")
4369
4370 client_context, server_context, hostname = testing_context()
4371 server_context.post_handshake_auth = True
4372 server_context.verify_mode = ssl.CERT_OPTIONAL
4373 client_context.post_handshake_auth = True
4374
4375 server = ThreadedEchoServer(context=server_context, chatty=False)
4376 with server:
4377 with client_context.wrap_socket(socket.socket(),
4378 server_hostname=hostname) as s:
4379 s.connect((HOST, server.port))
4380 s.write(b'HASCERT')
4381 self.assertEqual(s.recv(1024), b'FALSE\n')
4382 s.write(b'PHA')
4383 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004384 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004385 s.write(b'HASCERT')
4386 self.assertEqual(s.recv(1024), b'FALSE\n')
4387
4388 def test_pha_no_pha_client(self):
4389 client_context, server_context, hostname = testing_context()
4390 server_context.post_handshake_auth = True
4391 server_context.verify_mode = ssl.CERT_REQUIRED
4392 client_context.load_cert_chain(SIGNED_CERTFILE)
4393
4394 server = ThreadedEchoServer(context=server_context, chatty=False)
4395 with server:
4396 with client_context.wrap_socket(socket.socket(),
4397 server_hostname=hostname) as s:
4398 s.connect((HOST, server.port))
4399 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4400 s.verify_client_post_handshake()
4401 s.write(b'PHA')
4402 self.assertIn(b'extension not received', s.recv(1024))
4403
4404 def test_pha_no_pha_server(self):
4405 # server doesn't have PHA enabled, cert is requested in handshake
4406 client_context, server_context, hostname = testing_context()
4407 server_context.verify_mode = ssl.CERT_REQUIRED
4408 client_context.post_handshake_auth = True
4409 client_context.load_cert_chain(SIGNED_CERTFILE)
4410
4411 server = ThreadedEchoServer(context=server_context, chatty=False)
4412 with server:
4413 with client_context.wrap_socket(socket.socket(),
4414 server_hostname=hostname) as s:
4415 s.connect((HOST, server.port))
4416 s.write(b'HASCERT')
4417 self.assertEqual(s.recv(1024), b'TRUE\n')
4418 # PHA doesn't fail if there is already a cert
4419 s.write(b'PHA')
4420 self.assertEqual(s.recv(1024), b'OK\n')
4421 s.write(b'HASCERT')
4422 self.assertEqual(s.recv(1024), b'TRUE\n')
4423
4424 def test_pha_not_tls13(self):
4425 # TLS 1.2
4426 client_context, server_context, hostname = testing_context()
4427 server_context.verify_mode = ssl.CERT_REQUIRED
4428 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4429 client_context.post_handshake_auth = True
4430 client_context.load_cert_chain(SIGNED_CERTFILE)
4431
4432 server = ThreadedEchoServer(context=server_context, chatty=False)
4433 with server:
4434 with client_context.wrap_socket(socket.socket(),
4435 server_hostname=hostname) as s:
4436 s.connect((HOST, server.port))
4437 # PHA fails for TLS != 1.3
4438 s.write(b'PHA')
4439 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4440
Christian Heimesf22c4cf2019-07-01 09:25:48 +02004441 def test_bpo37428_pha_cert_none(self):
4442 # verify that post_handshake_auth does not implicitly enable cert
4443 # validation.
4444 hostname = SIGNED_CERTFILE_HOSTNAME
4445 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4446 client_context.post_handshake_auth = True
4447 client_context.load_cert_chain(SIGNED_CERTFILE)
4448 # no cert validation and CA on client side
4449 client_context.check_hostname = False
4450 client_context.verify_mode = ssl.CERT_NONE
4451
4452 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4453 server_context.load_cert_chain(SIGNED_CERTFILE)
4454 server_context.load_verify_locations(SIGNING_CA)
4455 server_context.post_handshake_auth = True
4456 server_context.verify_mode = ssl.CERT_REQUIRED
4457
4458 server = ThreadedEchoServer(context=server_context, chatty=False)
4459 with server:
4460 with client_context.wrap_socket(socket.socket(),
4461 server_hostname=hostname) as s:
4462 s.connect((HOST, server.port))
4463 s.write(b'HASCERT')
4464 self.assertEqual(s.recv(1024), b'FALSE\n')
4465 s.write(b'PHA')
4466 self.assertEqual(s.recv(1024), b'OK\n')
4467 s.write(b'HASCERT')
4468 self.assertEqual(s.recv(1024), b'TRUE\n')
4469 # server cert has not been validated
4470 self.assertEqual(s.getpeercert(), {})
4471
Christian Heimes9fb051f2018-09-23 08:32:31 +02004472
Christian Heimesc7f70692019-05-31 11:44:05 +02004473HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4474requires_keylog = unittest.skipUnless(
4475 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4476
4477class TestSSLDebug(unittest.TestCase):
4478
4479 def keylog_lines(self, fname=support.TESTFN):
4480 with open(fname) as f:
4481 return len(list(f))
4482
4483 @requires_keylog
4484 def test_keylog_defaults(self):
4485 self.addCleanup(support.unlink, support.TESTFN)
4486 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4487 self.assertEqual(ctx.keylog_filename, None)
4488
4489 self.assertFalse(os.path.isfile(support.TESTFN))
4490 ctx.keylog_filename = support.TESTFN
4491 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4492 self.assertTrue(os.path.isfile(support.TESTFN))
4493 self.assertEqual(self.keylog_lines(), 1)
4494
4495 ctx.keylog_filename = None
4496 self.assertEqual(ctx.keylog_filename, None)
4497
4498 with self.assertRaises((IsADirectoryError, PermissionError)):
4499 # Windows raises PermissionError
4500 ctx.keylog_filename = os.path.dirname(
4501 os.path.abspath(support.TESTFN))
4502
4503 with self.assertRaises(TypeError):
4504 ctx.keylog_filename = 1
4505
4506 @requires_keylog
4507 def test_keylog_filename(self):
4508 self.addCleanup(support.unlink, support.TESTFN)
4509 client_context, server_context, hostname = testing_context()
4510
4511 client_context.keylog_filename = support.TESTFN
4512 server = ThreadedEchoServer(context=server_context, chatty=False)
4513 with server:
4514 with client_context.wrap_socket(socket.socket(),
4515 server_hostname=hostname) as s:
4516 s.connect((HOST, server.port))
4517 # header, 5 lines for TLS 1.3
4518 self.assertEqual(self.keylog_lines(), 6)
4519
4520 client_context.keylog_filename = None
4521 server_context.keylog_filename = support.TESTFN
4522 server = ThreadedEchoServer(context=server_context, chatty=False)
4523 with server:
4524 with client_context.wrap_socket(socket.socket(),
4525 server_hostname=hostname) as s:
4526 s.connect((HOST, server.port))
4527 self.assertGreaterEqual(self.keylog_lines(), 11)
4528
4529 client_context.keylog_filename = support.TESTFN
4530 server_context.keylog_filename = support.TESTFN
4531 server = ThreadedEchoServer(context=server_context, chatty=False)
4532 with server:
4533 with client_context.wrap_socket(socket.socket(),
4534 server_hostname=hostname) as s:
4535 s.connect((HOST, server.port))
4536 self.assertGreaterEqual(self.keylog_lines(), 21)
4537
4538 client_context.keylog_filename = None
4539 server_context.keylog_filename = None
4540
4541 @requires_keylog
4542 @unittest.skipIf(sys.flags.ignore_environment,
4543 "test is not compatible with ignore_environment")
4544 def test_keylog_env(self):
4545 self.addCleanup(support.unlink, support.TESTFN)
4546 with unittest.mock.patch.dict(os.environ):
4547 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4548 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4549
4550 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4551 self.assertEqual(ctx.keylog_filename, None)
4552
4553 ctx = ssl.create_default_context()
4554 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4555
4556 ctx = ssl._create_stdlib_context()
4557 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4558
4559 def test_msg_callback(self):
4560 client_context, server_context, hostname = testing_context()
4561
4562 def msg_cb(conn, direction, version, content_type, msg_type, data):
4563 pass
4564
4565 self.assertIs(client_context._msg_callback, None)
4566 client_context._msg_callback = msg_cb
4567 self.assertIs(client_context._msg_callback, msg_cb)
4568 with self.assertRaises(TypeError):
4569 client_context._msg_callback = object()
4570
4571 def test_msg_callback_tls12(self):
4572 client_context, server_context, hostname = testing_context()
4573 client_context.options |= ssl.OP_NO_TLSv1_3
4574
4575 msg = []
4576
4577 def msg_cb(conn, direction, version, content_type, msg_type, data):
4578 self.assertIsInstance(conn, ssl.SSLSocket)
4579 self.assertIsInstance(data, bytes)
4580 self.assertIn(direction, {'read', 'write'})
4581 msg.append((direction, version, content_type, msg_type))
4582
4583 client_context._msg_callback = msg_cb
4584
4585 server = ThreadedEchoServer(context=server_context, chatty=False)
4586 with server:
4587 with client_context.wrap_socket(socket.socket(),
4588 server_hostname=hostname) as s:
4589 s.connect((HOST, server.port))
4590
Christian Heimese35d1ba2019-06-03 20:40:15 +02004591 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004592 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4593 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004594 msg
4595 )
4596 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004597 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4598 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004599 msg
4600 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004601
4602
Thomas Woutersed03b412007-08-28 21:37:11 +00004603def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004604 if support.verbose:
4605 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004606 'Mac': platform.mac_ver,
4607 'Windows': platform.win32_ver,
4608 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004609 for name, func in plats.items():
4610 plat = func()
4611 if plat and plat[0]:
4612 plat = '%s %r' % (name, plat)
4613 break
4614 else:
4615 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004616 print("test_ssl: testing with %r %r" %
4617 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4618 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004619 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004620 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4621 try:
4622 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4623 except AttributeError:
4624 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004625
Antoine Pitrou152efa22010-05-16 18:19:27 +00004626 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004627 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004628 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004629 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004630 BADCERT, BADKEY, EMPTYCERT]:
4631 if not os.path.exists(filename):
4632 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004633
Martin Panter3840b2a2016-03-27 01:53:46 +00004634 tests = [
4635 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004636 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004637 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004638 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004639
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004640 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004641 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004642
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004643 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004644 try:
4645 support.run_unittest(*tests)
4646 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004647 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004648
4649if __name__ == "__main__":
4650 test_main()