blob: d2b9e2046d0eecf7132db24640474f7f19c5ae45 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Victor Stinner8f4ef3b2019-07-01 18:28:25 +020029from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000030
Paul Monsonf3550692019-06-19 13:09:54 -070031Py_DEBUG = hasattr(sys, 'gettotalrefcount')
32Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
33
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010034PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020036IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010037IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
38IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010039PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000040
Victor Stinner3ef63442019-02-19 18:06:03 +010041PROTOCOL_TO_TLS_VERSION = {}
42for proto, ver in (
43 ("PROTOCOL_SSLv23", "SSLv3"),
44 ("PROTOCOL_TLSv1", "TLSv1"),
45 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
46):
47 try:
48 proto = getattr(ssl, proto)
49 ver = getattr(ssl.TLSVersion, ver)
50 except AttributeError:
51 continue
52 PROTOCOL_TO_TLS_VERSION[proto] = ver
53
Christian Heimesefff7062013-11-21 03:35:02 +010054def data_file(*name):
55 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000056
Antoine Pitrou81564092010-10-08 23:06:24 +000057# The custom key and certificate files used in test_ssl are generated
58# using Lib/test/make_ssl_certs.py.
59# Other certificates are simply fetched from the Internet servers they
60# are meant to authenticate.
61
Antoine Pitrou152efa22010-05-16 18:19:27 +000062CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000063BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000064ONLYCERT = data_file("ssl_cert.pem")
65ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000066BYTES_ONLYCERT = os.fsencode(ONLYCERT)
67BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020068CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
69ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
70KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000071CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000072BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010073CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
74CAFILE_CACERT = data_file("capath", "5ed36f99.0")
75
Christian Heimesbd5c7d22018-01-20 15:16:30 +010076CERTFILE_INFO = {
77 'issuer': ((('countryName', 'XY'),),
78 (('localityName', 'Castle Anthrax'),),
79 (('organizationName', 'Python Software Foundation'),),
80 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020081 'notAfter': 'Aug 26 14:23:15 2028 GMT',
82 'notBefore': 'Aug 29 14:23:15 2018 GMT',
83 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010084 'subject': ((('countryName', 'XY'),),
85 (('localityName', 'Castle Anthrax'),),
86 (('organizationName', 'Python Software Foundation'),),
87 (('commonName', 'localhost'),)),
88 'subjectAltName': (('DNS', 'localhost'),),
89 'version': 3
90}
Antoine Pitrou152efa22010-05-16 18:19:27 +000091
Christian Heimes22587792013-11-21 23:56:13 +010092# empty CRL
93CRLFILE = data_file("revocation.crl")
94
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010095# Two keys and certs signed by the same CA (for SNI tests)
96SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020097SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010098
99SIGNED_CERTFILE_INFO = {
100 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
101 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
102 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
103 'issuer': ((('countryName', 'XY'),),
104 (('organizationName', 'Python Software Foundation CA'),),
105 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200106 'notAfter': 'Jul 7 14:23:16 2028 GMT',
107 'notBefore': 'Aug 29 14:23:16 2018 GMT',
108 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100109 'subject': ((('countryName', 'XY'),),
110 (('localityName', 'Castle Anthrax'),),
111 (('organizationName', 'Python Software Foundation'),),
112 (('commonName', 'localhost'),)),
113 'subjectAltName': (('DNS', 'localhost'),),
114 'version': 3
115}
116
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100117SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200118SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100119SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
120SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
121
Martin Panter3840b2a2016-03-27 01:53:46 +0000122# Same certificate as pycacert.pem, but without extra text in file
123SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200124# cert with all kinds of subject alt names
125ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100126IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100127
Martin Panter3d81d932016-01-14 09:36:00 +0000128REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000129
130EMPTYCERT = data_file("nullcert.pem")
131BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000132NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000133BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200134NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200135NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100136TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000137
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200138DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100139BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000140
Christian Heimes358cfd42016-09-10 22:43:48 +0200141# Not defined in all versions of OpenSSL
142OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
143OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
144OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
145OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100146OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200147
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100148
Thomas Woutersed03b412007-08-28 21:37:11 +0000149def handle_error(prefix):
150 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000151 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000152 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000153
Antoine Pitroub5218772010-05-21 09:56:06 +0000154def can_clear_options():
155 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200156 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000157
158def no_sslv2_implies_sslv3_hello():
159 # 0.9.7h or higher
160 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
161
Christian Heimes2427b502013-11-23 11:24:32 +0100162def have_verify_flags():
163 # 0.9.8 or higher
164 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
165
Christian Heimesb7b92252018-02-25 09:49:31 +0100166def _have_secp_curves():
167 if not ssl.HAS_ECDH:
168 return False
169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
170 try:
171 ctx.set_ecdh_curve("secp384r1")
172 except ValueError:
173 return False
174 else:
175 return True
176
177
178HAVE_SECP_CURVES = _have_secp_curves()
179
180
Antoine Pitrouc695c952014-04-28 20:57:36 +0200181def utc_offset(): #NOTE: ignore issues like #1647654
182 # local time = utc time + utc offset
183 if time.daylight and time.localtime().tm_isdst > 0:
184 return -time.altzone # seconds
185 return -time.timezone
186
Christian Heimes9424bb42013-06-17 15:32:57 +0200187def asn1time(cert_time):
188 # Some versions of OpenSSL ignore seconds, see #18207
189 # 0.9.8.i
190 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
191 fmt = "%b %d %H:%M:%S %Y GMT"
192 dt = datetime.datetime.strptime(cert_time, fmt)
193 dt = dt.replace(second=0)
194 cert_time = dt.strftime(fmt)
195 # %d adds leading zero but ASN1_TIME_print() uses leading space
196 if cert_time[4] == "0":
197 cert_time = cert_time[:4] + " " + cert_time[5:]
198
199 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000200
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100201needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
202
Antoine Pitrou23df4832010-08-04 17:14:06 +0000203
Christian Heimesd0486372016-09-10 23:23:33 +0200204def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
205 cert_reqs=ssl.CERT_NONE, ca_certs=None,
206 ciphers=None, certfile=None, keyfile=None,
207 **kwargs):
208 context = ssl.SSLContext(ssl_version)
209 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200210 if cert_reqs == ssl.CERT_NONE:
211 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200212 context.verify_mode = cert_reqs
213 if ca_certs is not None:
214 context.load_verify_locations(ca_certs)
215 if certfile is not None or keyfile is not None:
216 context.load_cert_chain(certfile, keyfile)
217 if ciphers is not None:
218 context.set_ciphers(ciphers)
219 return context.wrap_socket(sock, **kwargs)
220
Christian Heimesa170fa12017-09-15 20:27:30 +0200221
222def testing_context(server_cert=SIGNED_CERTFILE):
223 """Create context
224
225 client_context, server_context, hostname = testing_context()
226 """
227 if server_cert == SIGNED_CERTFILE:
228 hostname = SIGNED_CERTFILE_HOSTNAME
229 elif server_cert == SIGNED_CERTFILE2:
230 hostname = SIGNED_CERTFILE2_HOSTNAME
231 else:
232 raise ValueError(server_cert)
233
234 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
235 client_context.load_verify_locations(SIGNING_CA)
236
237 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
238 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200239 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200240
241 return client_context, server_context, hostname
242
243
Antoine Pitrou152efa22010-05-16 18:19:27 +0000244class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000245
Antoine Pitrou480a1242010-04-28 21:37:09 +0000246 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000247 ssl.CERT_NONE
248 ssl.CERT_OPTIONAL
249 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100250 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100251 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100252 if ssl.HAS_ECDH:
253 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100254 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
255 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000256 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100257 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700258 ssl.OP_NO_SSLv2
259 ssl.OP_NO_SSLv3
260 ssl.OP_NO_TLSv1
261 ssl.OP_NO_TLSv1_3
262 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
263 ssl.OP_NO_TLSv1_1
264 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200265 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000266
Christian Heimes9d50ab52018-02-27 10:17:30 +0100267 def test_private_init(self):
268 with self.assertRaisesRegex(TypeError, "public constructor"):
269 with socket.socket() as s:
270 ssl.SSLSocket(s)
271
Antoine Pitrou172f0252014-04-18 20:33:08 +0200272 def test_str_for_enums(self):
273 # Make sure that the PROTOCOL_* constants have enum-like string
274 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200275 proto = ssl.PROTOCOL_TLS
276 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200277 ctx = ssl.SSLContext(proto)
278 self.assertIs(ctx.protocol, proto)
279
Antoine Pitrou480a1242010-04-28 21:37:09 +0000280 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000282 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000283 sys.stdout.write("\n RAND_status is %d (%s)\n"
284 % (v, (v and "sufficient randomness") or
285 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200286
287 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
288 self.assertEqual(len(data), 16)
289 self.assertEqual(is_cryptographic, v == 1)
290 if v:
291 data = ssl.RAND_bytes(16)
292 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200293 else:
294 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200295
Victor Stinner1e81a392013-12-19 16:47:04 +0100296 # negative num is invalid
297 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
298 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
299
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100300 if hasattr(ssl, 'RAND_egd'):
301 self.assertRaises(TypeError, ssl.RAND_egd, 1)
302 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000303 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200304 ssl.RAND_add(b"this is a random bytes object", 75.0)
305 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000306
Christian Heimesf77b4b22013-08-21 13:26:05 +0200307 @unittest.skipUnless(os.name == 'posix', 'requires posix')
308 def test_random_fork(self):
309 status = ssl.RAND_status()
310 if not status:
311 self.fail("OpenSSL's PRNG has insufficient randomness")
312
313 rfd, wfd = os.pipe()
314 pid = os.fork()
315 if pid == 0:
316 try:
317 os.close(rfd)
318 child_random = ssl.RAND_pseudo_bytes(16)[0]
319 self.assertEqual(len(child_random), 16)
320 os.write(wfd, child_random)
321 os.close(wfd)
322 except BaseException:
323 os._exit(1)
324 else:
325 os._exit(0)
326 else:
327 os.close(wfd)
328 self.addCleanup(os.close, rfd)
329 _, status = os.waitpid(pid, 0)
330 self.assertEqual(status, 0)
331
332 child_random = os.read(rfd, 16)
333 self.assertEqual(len(child_random), 16)
334 parent_random = ssl.RAND_pseudo_bytes(16)[0]
335 self.assertEqual(len(parent_random), 16)
336
337 self.assertNotEqual(child_random, parent_random)
338
Christian Heimese6dac002018-08-30 07:25:49 +0200339 maxDiff = None
340
Antoine Pitrou480a1242010-04-28 21:37:09 +0000341 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000342 # note that this uses an 'unofficial' function in _ssl.c,
343 # provided solely for this test, to exercise the certificate
344 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100345 self.assertEqual(
346 ssl._ssl._test_decode_cert(CERTFILE),
347 CERTFILE_INFO
348 )
349 self.assertEqual(
350 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
351 SIGNED_CERTFILE_INFO
352 )
353
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200354 # Issue #13034: the subjectAltName in some certificates
355 # (notably projects.developer.nokia.com:443) wasn't parsed
356 p = ssl._ssl._test_decode_cert(NOKIACERT)
357 if support.verbose:
358 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
359 self.assertEqual(p['subjectAltName'],
360 (('DNS', 'projects.developer.nokia.com'),
361 ('DNS', 'projects.forum.nokia.com'))
362 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100363 # extra OCSP and AIA fields
364 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
365 self.assertEqual(p['caIssuers'],
366 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
367 self.assertEqual(p['crlDistributionPoints'],
368 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000369
Christian Heimesa37f5242019-01-15 23:47:42 +0100370 def test_parse_cert_CVE_2019_5010(self):
371 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
372 if support.verbose:
373 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
374 self.assertEqual(
375 p,
376 {
377 'issuer': (
378 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
379 'notAfter': 'Jun 14 18:00:58 2028 GMT',
380 'notBefore': 'Jun 18 18:00:58 2018 GMT',
381 'serialNumber': '02',
382 'subject': ((('countryName', 'UK'),),
383 (('commonName',
384 'codenomicon-vm-2.test.lal.cisco.com'),)),
385 'subjectAltName': (
386 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
387 'version': 3
388 }
389 )
390
Christian Heimes824f7f32013-08-17 00:54:47 +0200391 def test_parse_cert_CVE_2013_4238(self):
392 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
393 if support.verbose:
394 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
395 subject = ((('countryName', 'US'),),
396 (('stateOrProvinceName', 'Oregon'),),
397 (('localityName', 'Beaverton'),),
398 (('organizationName', 'Python Software Foundation'),),
399 (('organizationalUnitName', 'Python Core Development'),),
400 (('commonName', 'null.python.org\x00example.org'),),
401 (('emailAddress', 'python-dev@python.org'),))
402 self.assertEqual(p['subject'], subject)
403 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200404 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
405 san = (('DNS', 'altnull.python.org\x00example.com'),
406 ('email', 'null@python.org\x00user@example.org'),
407 ('URI', 'http://null.python.org\x00http://example.org'),
408 ('IP Address', '192.0.2.1'),
409 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
410 else:
411 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
412 san = (('DNS', 'altnull.python.org\x00example.com'),
413 ('email', 'null@python.org\x00user@example.org'),
414 ('URI', 'http://null.python.org\x00http://example.org'),
415 ('IP Address', '192.0.2.1'),
416 ('IP Address', '<invalid>'))
417
418 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200419
Christian Heimes1c03abd2016-09-06 23:25:35 +0200420 def test_parse_all_sans(self):
421 p = ssl._ssl._test_decode_cert(ALLSANFILE)
422 self.assertEqual(p['subjectAltName'],
423 (
424 ('DNS', 'allsans'),
425 ('othername', '<unsupported>'),
426 ('othername', '<unsupported>'),
427 ('email', 'user@example.org'),
428 ('DNS', 'www.example.org'),
429 ('DirName',
430 ((('countryName', 'XY'),),
431 (('localityName', 'Castle Anthrax'),),
432 (('organizationName', 'Python Software Foundation'),),
433 (('commonName', 'dirname example'),))),
434 ('URI', 'https://www.python.org/'),
435 ('IP Address', '127.0.0.1'),
436 ('IP Address', '0:0:0:0:0:0:0:1\n'),
437 ('Registered ID', '1.2.3.4.5')
438 )
439 )
440
Antoine Pitrou480a1242010-04-28 21:37:09 +0000441 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000442 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000443 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000444 d1 = ssl.PEM_cert_to_DER_cert(pem)
445 p2 = ssl.DER_cert_to_PEM_cert(d1)
446 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000447 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000448 if not p2.startswith(ssl.PEM_HEADER + '\n'):
449 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
450 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
451 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000452
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000453 def test_openssl_version(self):
454 n = ssl.OPENSSL_VERSION_NUMBER
455 t = ssl.OPENSSL_VERSION_INFO
456 s = ssl.OPENSSL_VERSION
457 self.assertIsInstance(n, int)
458 self.assertIsInstance(t, tuple)
459 self.assertIsInstance(s, str)
460 # Some sanity checks follow
461 # >= 0.9
462 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400463 # < 3.0
464 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000465 major, minor, fix, patch, status = t
466 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400467 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000468 self.assertGreaterEqual(minor, 0)
469 self.assertLess(minor, 256)
470 self.assertGreaterEqual(fix, 0)
471 self.assertLess(fix, 256)
472 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100473 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000474 self.assertGreaterEqual(status, 0)
475 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400476 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200477 if IS_LIBRESSL:
478 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100479 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400480 else:
481 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100482 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000483
Antoine Pitrou9d543662010-04-23 23:10:32 +0000484 @support.cpython_only
485 def test_refcycle(self):
486 # Issue #7943: an SSL object doesn't create reference cycles with
487 # itself.
488 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200489 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000490 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100491 with support.check_warnings(("", ResourceWarning)):
492 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100493 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000494
Antoine Pitroua468adc2010-09-14 14:43:44 +0000495 def test_wrapped_unconnected(self):
496 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200497 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000498 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200499 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100500 self.assertRaises(OSError, ss.recv, 1)
501 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
502 self.assertRaises(OSError, ss.recvfrom, 1)
503 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
504 self.assertRaises(OSError, ss.send, b'x')
505 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200506 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100507 self.assertRaises(NotImplementedError, ss.sendmsg,
508 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200509 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
510 self.assertRaises(NotImplementedError, ss.recvmsg_into,
511 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000512
Antoine Pitrou40f08742010-04-24 22:04:40 +0000513 def test_timeout(self):
514 # Issue #8524: when creating an SSL socket, the timeout of the
515 # original socket should be retained.
516 for timeout in (None, 0.0, 5.0):
517 s = socket.socket(socket.AF_INET)
518 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200519 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100520 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000521
Christian Heimesd0486372016-09-10 23:23:33 +0200522 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000523 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000524 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000525 "certfile must be specified",
526 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000527 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000528 "certfile must be specified for server-side operations",
529 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000530 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000531 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200532 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100533 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
534 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200535 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200536 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000537 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000538 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000539 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200540 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000541 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000542 ssl.wrap_socket(sock,
543 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000544 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200545 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000546 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000547 ssl.wrap_socket(sock,
548 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000549 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000550
Martin Panter3464ea22016-02-01 21:58:11 +0000551 def bad_cert_test(self, certfile):
552 """Check that trying to use the given client certificate fails"""
553 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
554 certfile)
555 sock = socket.socket()
556 self.addCleanup(sock.close)
557 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200558 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200559 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000560
561 def test_empty_cert(self):
562 """Wrapping with an empty cert file"""
563 self.bad_cert_test("nullcert.pem")
564
565 def test_malformed_cert(self):
566 """Wrapping with a badly formatted certificate (syntax error)"""
567 self.bad_cert_test("badcert.pem")
568
569 def test_malformed_key(self):
570 """Wrapping with a badly formatted key (syntax error)"""
571 self.bad_cert_test("badkey.pem")
572
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000573 def test_match_hostname(self):
574 def ok(cert, hostname):
575 ssl.match_hostname(cert, hostname)
576 def fail(cert, hostname):
577 self.assertRaises(ssl.CertificateError,
578 ssl.match_hostname, cert, hostname)
579
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100580 # -- Hostname matching --
581
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000582 cert = {'subject': ((('commonName', 'example.com'),),)}
583 ok(cert, 'example.com')
584 ok(cert, 'ExAmple.cOm')
585 fail(cert, 'www.example.com')
586 fail(cert, '.example.com')
587 fail(cert, 'example.org')
588 fail(cert, 'exampleXcom')
589
590 cert = {'subject': ((('commonName', '*.a.com'),),)}
591 ok(cert, 'foo.a.com')
592 fail(cert, 'bar.foo.a.com')
593 fail(cert, 'a.com')
594 fail(cert, 'Xa.com')
595 fail(cert, '.a.com')
596
Mandeep Singhede2ac92017-11-27 04:01:27 +0530597 # only match wildcards when they are the only thing
598 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530600 fail(cert, 'foo.com')
601 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000602 fail(cert, 'bar.com')
603 fail(cert, 'foo.a.com')
604 fail(cert, 'bar.foo.com')
605
Christian Heimes824f7f32013-08-17 00:54:47 +0200606 # NULL bytes are bad, CVE-2013-4073
607 cert = {'subject': ((('commonName',
608 'null.python.org\x00example.org'),),)}
609 ok(cert, 'null.python.org\x00example.org') # or raise an error?
610 fail(cert, 'example.org')
611 fail(cert, 'null.python.org')
612
Georg Brandl72c98d32013-10-27 07:16:53 +0100613 # error cases with wildcards
614 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
615 fail(cert, 'bar.foo.a.com')
616 fail(cert, 'a.com')
617 fail(cert, 'Xa.com')
618 fail(cert, '.a.com')
619
620 cert = {'subject': ((('commonName', 'a.*.com'),),)}
621 fail(cert, 'a.foo.com')
622 fail(cert, 'a..com')
623 fail(cert, 'a.com')
624
625 # wildcard doesn't match IDNA prefix 'xn--'
626 idna = 'püthon.python.org'.encode("idna").decode("ascii")
627 cert = {'subject': ((('commonName', idna),),)}
628 ok(cert, idna)
629 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
630 fail(cert, idna)
631 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
632 fail(cert, idna)
633
634 # wildcard in first fragment and IDNA A-labels in sequent fragments
635 # are supported.
636 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
637 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530638 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
639 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100640 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
641 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
642
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000643 # Slightly fake real-world example
644 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
645 'subject': ((('commonName', 'linuxfrz.org'),),),
646 'subjectAltName': (('DNS', 'linuxfr.org'),
647 ('DNS', 'linuxfr.com'),
648 ('othername', '<unsupported>'))}
649 ok(cert, 'linuxfr.org')
650 ok(cert, 'linuxfr.com')
651 # Not a "DNS" entry
652 fail(cert, '<unsupported>')
653 # When there is a subjectAltName, commonName isn't used
654 fail(cert, 'linuxfrz.org')
655
656 # A pristine real-world example
657 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
658 'subject': ((('countryName', 'US'),),
659 (('stateOrProvinceName', 'California'),),
660 (('localityName', 'Mountain View'),),
661 (('organizationName', 'Google Inc'),),
662 (('commonName', 'mail.google.com'),))}
663 ok(cert, 'mail.google.com')
664 fail(cert, 'gmail.com')
665 # Only commonName is considered
666 fail(cert, 'California')
667
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100668 # -- IPv4 matching --
669 cert = {'subject': ((('commonName', 'example.com'),),),
670 'subjectAltName': (('DNS', 'example.com'),
671 ('IP Address', '10.11.12.13'),
Christian Heimes477b1b22019-07-02 20:39:42 +0200672 ('IP Address', '14.15.16.17'),
673 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100674 ok(cert, '10.11.12.13')
675 ok(cert, '14.15.16.17')
Christian Heimes477b1b22019-07-02 20:39:42 +0200676 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
677 fail(cert, '127.1')
678 fail(cert, '14.15.16.17 ')
679 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100680 fail(cert, '14.15.16.18')
681 fail(cert, 'example.net')
682
683 # -- IPv6 matching --
Zackery Spytzc2cda632019-06-30 09:24:43 -0600684 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100685 cert = {'subject': ((('commonName', 'example.com'),),),
686 'subjectAltName': (
687 ('DNS', 'example.com'),
688 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
689 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
690 ok(cert, '2001::cafe')
691 ok(cert, '2003::baba')
Christian Heimes477b1b22019-07-02 20:39:42 +0200692 fail(cert, '2003::baba ')
693 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100694 fail(cert, '2003::bebe')
695 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100696
697 # -- Miscellaneous --
698
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000699 # Neither commonName nor subjectAltName
700 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
701 'subject': ((('countryName', 'US'),),
702 (('stateOrProvinceName', 'California'),),
703 (('localityName', 'Mountain View'),),
704 (('organizationName', 'Google Inc'),))}
705 fail(cert, 'mail.google.com')
706
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200707 # No DNS entry in subjectAltName but a commonName
708 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
709 'subject': ((('countryName', 'US'),),
710 (('stateOrProvinceName', 'California'),),
711 (('localityName', 'Mountain View'),),
712 (('commonName', 'mail.google.com'),)),
713 'subjectAltName': (('othername', 'blabla'), )}
714 ok(cert, 'mail.google.com')
715
716 # No DNS entry subjectAltName and no commonName
717 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
718 'subject': ((('countryName', 'US'),),
719 (('stateOrProvinceName', 'California'),),
720 (('localityName', 'Mountain View'),),
721 (('organizationName', 'Google Inc'),)),
722 'subjectAltName': (('othername', 'blabla'),)}
723 fail(cert, 'google.com')
724
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000725 # Empty cert / no cert
726 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
727 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
728
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200729 # Issue #17980: avoid denials of service by refusing more than one
730 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100731 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
732 with self.assertRaisesRegex(
733 ssl.CertificateError,
734 "partial wildcards in leftmost label are not supported"):
735 ssl.match_hostname(cert, 'axxb.example.com')
736
737 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
738 with self.assertRaisesRegex(
739 ssl.CertificateError,
740 "wildcard can only be present in the leftmost label"):
741 ssl.match_hostname(cert, 'www.sub.example.com')
742
743 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
744 with self.assertRaisesRegex(
745 ssl.CertificateError,
746 "too many wildcards"):
747 ssl.match_hostname(cert, 'axxbxxc.example.com')
748
749 cert = {'subject': ((('commonName', '*'),),)}
750 with self.assertRaisesRegex(
751 ssl.CertificateError,
752 "sole wildcard without additional labels are not support"):
753 ssl.match_hostname(cert, 'host')
754
755 cert = {'subject': ((('commonName', '*.com'),),)}
756 with self.assertRaisesRegex(
757 ssl.CertificateError,
758 r"hostname 'com' doesn't match '\*.com'"):
759 ssl.match_hostname(cert, 'com')
760
761 # extra checks for _inet_paton()
762 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
763 with self.assertRaises(ValueError):
764 ssl._inet_paton(invalid)
765 for ipaddr in ['127.0.0.1', '192.168.0.1']:
766 self.assertTrue(ssl._inet_paton(ipaddr))
Zackery Spytzc2cda632019-06-30 09:24:43 -0600767 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100768 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
769 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200770
Antoine Pitroud5323212010-10-22 18:19:07 +0000771 def test_server_side(self):
772 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200773 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000774 with socket.socket() as sock:
775 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
776 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000777
Antoine Pitroud6494802011-07-21 01:11:30 +0200778 def test_unknown_channel_binding(self):
779 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200780 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200781 c = socket.socket(socket.AF_INET)
782 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200783 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100784 with self.assertRaises(ValueError):
785 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200786 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200787
788 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
789 "'tls-unique' channel binding not available")
790 def test_tls_unique_channel_binding(self):
791 # unconnected should return None for known type
792 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200793 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100794 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200795 # the same for server-side
796 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200797 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100798 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200799
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600800 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200801 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600802 r = repr(ss)
803 with self.assertWarns(ResourceWarning) as cm:
804 ss = None
805 support.gc_collect()
806 self.assertIn(r, str(cm.warning.args[0]))
807
Christian Heimes6d7ad132013-06-09 18:02:55 +0200808 def test_get_default_verify_paths(self):
809 paths = ssl.get_default_verify_paths()
810 self.assertEqual(len(paths), 6)
811 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
812
813 with support.EnvironmentVarGuard() as env:
814 env["SSL_CERT_DIR"] = CAPATH
815 env["SSL_CERT_FILE"] = CERTFILE
816 paths = ssl.get_default_verify_paths()
817 self.assertEqual(paths.cafile, CERTFILE)
818 self.assertEqual(paths.capath, CAPATH)
819
Christian Heimes44109d72013-11-22 01:51:30 +0100820 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
821 def test_enum_certificates(self):
822 self.assertTrue(ssl.enum_certificates("CA"))
823 self.assertTrue(ssl.enum_certificates("ROOT"))
824
825 self.assertRaises(TypeError, ssl.enum_certificates)
826 self.assertRaises(WindowsError, ssl.enum_certificates, "")
827
Christian Heimesc2d65e12013-11-22 16:13:55 +0100828 trust_oids = set()
829 for storename in ("CA", "ROOT"):
830 store = ssl.enum_certificates(storename)
831 self.assertIsInstance(store, list)
832 for element in store:
833 self.assertIsInstance(element, tuple)
834 self.assertEqual(len(element), 3)
835 cert, enc, trust = element
836 self.assertIsInstance(cert, bytes)
837 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
838 self.assertIsInstance(trust, (set, bool))
839 if isinstance(trust, set):
840 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100841
842 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100843 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200844
Christian Heimes46bebee2013-06-09 19:03:31 +0200845 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100846 def test_enum_crls(self):
847 self.assertTrue(ssl.enum_crls("CA"))
848 self.assertRaises(TypeError, ssl.enum_crls)
849 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200850
Christian Heimes44109d72013-11-22 01:51:30 +0100851 crls = ssl.enum_crls("CA")
852 self.assertIsInstance(crls, list)
853 for element in crls:
854 self.assertIsInstance(element, tuple)
855 self.assertEqual(len(element), 2)
856 self.assertIsInstance(element[0], bytes)
857 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200858
Christian Heimes46bebee2013-06-09 19:03:31 +0200859
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100860 def test_asn1object(self):
861 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
862 '1.3.6.1.5.5.7.3.1')
863
864 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
865 self.assertEqual(val, expected)
866 self.assertEqual(val.nid, 129)
867 self.assertEqual(val.shortname, 'serverAuth')
868 self.assertEqual(val.longname, 'TLS Web Server Authentication')
869 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
870 self.assertIsInstance(val, ssl._ASN1Object)
871 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
872
873 val = ssl._ASN1Object.fromnid(129)
874 self.assertEqual(val, expected)
875 self.assertIsInstance(val, ssl._ASN1Object)
876 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100877 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
878 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100879 for i in range(1000):
880 try:
881 obj = ssl._ASN1Object.fromnid(i)
882 except ValueError:
883 pass
884 else:
885 self.assertIsInstance(obj.nid, int)
886 self.assertIsInstance(obj.shortname, str)
887 self.assertIsInstance(obj.longname, str)
888 self.assertIsInstance(obj.oid, (str, type(None)))
889
890 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
891 self.assertEqual(val, expected)
892 self.assertIsInstance(val, ssl._ASN1Object)
893 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
894 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
895 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100896 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
897 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100898
Christian Heimes72d28502013-11-23 13:56:58 +0100899 def test_purpose_enum(self):
900 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
901 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
902 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
903 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
904 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
905 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
906 '1.3.6.1.5.5.7.3.1')
907
908 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
909 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
910 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
911 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
912 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
913 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
914 '1.3.6.1.5.5.7.3.2')
915
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100916 def test_unsupported_dtls(self):
917 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
918 self.addCleanup(s.close)
919 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200920 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100921 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200922 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100923 with self.assertRaises(NotImplementedError) as cx:
924 ctx.wrap_socket(s)
925 self.assertEqual(str(cx.exception), "only stream sockets are supported")
926
Antoine Pitrouc695c952014-04-28 20:57:36 +0200927 def cert_time_ok(self, timestring, timestamp):
928 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
929
930 def cert_time_fail(self, timestring):
931 with self.assertRaises(ValueError):
932 ssl.cert_time_to_seconds(timestring)
933
934 @unittest.skipUnless(utc_offset(),
935 'local time needs to be different from UTC')
936 def test_cert_time_to_seconds_timezone(self):
937 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
938 # results if local timezone is not UTC
939 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
940 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
941
942 def test_cert_time_to_seconds(self):
943 timestring = "Jan 5 09:34:43 2018 GMT"
944 ts = 1515144883.0
945 self.cert_time_ok(timestring, ts)
946 # accept keyword parameter, assert its name
947 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
948 # accept both %e and %d (space or zero generated by strftime)
949 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
950 # case-insensitive
951 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
952 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
953 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
954 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
955 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
956 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
957 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
958 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
959
960 newyear_ts = 1230768000.0
961 # leap seconds
962 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
963 # same timestamp
964 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
965
966 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
967 # allow 60th second (even if it is not a leap second)
968 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
969 # allow 2nd leap second for compatibility with time.strptime()
970 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
971 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
972
Mike53f7a7c2017-12-14 14:04:53 +0300973 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200974 # 99991231235959Z (rfc 5280)
975 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
976
977 @support.run_with_locale('LC_ALL', '')
978 def test_cert_time_to_seconds_locale(self):
979 # `cert_time_to_seconds()` should be locale independent
980
981 def local_february_name():
982 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
983
984 if local_february_name().lower() == 'feb':
985 self.skipTest("locale-specific month name needs to be "
986 "different from C locale")
987
988 # locale-independent
989 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
990 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
991
Martin Panter3840b2a2016-03-27 01:53:46 +0000992 def test_connect_ex_error(self):
993 server = socket.socket(socket.AF_INET)
994 self.addCleanup(server.close)
995 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200996 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000997 cert_reqs=ssl.CERT_REQUIRED)
998 self.addCleanup(s.close)
999 rc = s.connect_ex((HOST, port))
1000 # Issue #19919: Windows machines or VMs hosted on Windows
1001 # machines sometimes return EWOULDBLOCK.
1002 errors = (
1003 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1004 errno.EWOULDBLOCK,
1005 )
1006 self.assertIn(rc, errors)
1007
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001008
Antoine Pitrou152efa22010-05-16 18:19:27 +00001009class ContextTests(unittest.TestCase):
1010
1011 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001012 for protocol in PROTOCOLS:
1013 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001014 ctx = ssl.SSLContext()
1015 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001016 self.assertRaises(ValueError, ssl.SSLContext, -1)
1017 self.assertRaises(ValueError, ssl.SSLContext, 42)
1018
1019 def test_protocol(self):
1020 for proto in PROTOCOLS:
1021 ctx = ssl.SSLContext(proto)
1022 self.assertEqual(ctx.protocol, proto)
1023
1024 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001025 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001026 ctx.set_ciphers("ALL")
1027 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001028 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001029 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001030
Christian Heimes892d66e2018-01-29 14:10:18 +01001031 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1032 "Test applies only to Python default ciphers")
1033 def test_python_ciphers(self):
1034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1035 ciphers = ctx.get_ciphers()
1036 for suite in ciphers:
1037 name = suite['name']
1038 self.assertNotIn("PSK", name)
1039 self.assertNotIn("SRP", name)
1040 self.assertNotIn("MD5", name)
1041 self.assertNotIn("RC4", name)
1042 self.assertNotIn("3DES", name)
1043
Christian Heimes25bfcd52016-09-06 00:04:45 +02001044 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1045 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001046 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001047 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001048 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001049 self.assertIn('AES256-GCM-SHA384', names)
1050 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001051
Antoine Pitroub5218772010-05-21 09:56:06 +00001052 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001053 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001054 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001055 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001056 # SSLContext also enables these by default
1057 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001058 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1059 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001060 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001061 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001062 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001063 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001064 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1065 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001066 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001067 # Ubuntu has OP_NO_SSLv3 forced on by default
1068 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001069 else:
1070 with self.assertRaises(ValueError):
1071 ctx.options = 0
1072
Christian Heimesa170fa12017-09-15 20:27:30 +02001073 def test_verify_mode_protocol(self):
1074 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001075 # Default value
1076 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1077 ctx.verify_mode = ssl.CERT_OPTIONAL
1078 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1079 ctx.verify_mode = ssl.CERT_REQUIRED
1080 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1081 ctx.verify_mode = ssl.CERT_NONE
1082 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1083 with self.assertRaises(TypeError):
1084 ctx.verify_mode = None
1085 with self.assertRaises(ValueError):
1086 ctx.verify_mode = 42
1087
Christian Heimesa170fa12017-09-15 20:27:30 +02001088 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1089 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1090 self.assertFalse(ctx.check_hostname)
1091
1092 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1093 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1094 self.assertTrue(ctx.check_hostname)
1095
Christian Heimes61d478c2018-01-27 15:51:38 +01001096 def test_hostname_checks_common_name(self):
1097 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1098 self.assertTrue(ctx.hostname_checks_common_name)
1099 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1100 ctx.hostname_checks_common_name = True
1101 self.assertTrue(ctx.hostname_checks_common_name)
1102 ctx.hostname_checks_common_name = False
1103 self.assertFalse(ctx.hostname_checks_common_name)
1104 ctx.hostname_checks_common_name = True
1105 self.assertTrue(ctx.hostname_checks_common_name)
1106 else:
1107 with self.assertRaises(AttributeError):
1108 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001109
Christian Heimes698dde12018-02-27 11:54:43 +01001110 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1111 "required OpenSSL 1.1.0g")
1112 def test_min_max_version(self):
1113 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001114 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1115 # Fedora override the setting to TLS 1.0.
1116 self.assertIn(
1117 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001118 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1119 # Fedora 29 uses TLS 1.0 by default
1120 ssl.TLSVersion.TLSv1,
1121 # RHEL 8 uses TLS 1.2 by default
1122 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001123 )
1124 self.assertEqual(
1125 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1126 )
1127
1128 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1129 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1130 self.assertEqual(
1131 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1132 )
1133 self.assertEqual(
1134 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1135 )
1136
1137 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1138 ctx.maximum_version = ssl.TLSVersion.TLSv1
1139 self.assertEqual(
1140 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1141 )
1142 self.assertEqual(
1143 ctx.maximum_version, ssl.TLSVersion.TLSv1
1144 )
1145
1146 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1147 self.assertEqual(
1148 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1149 )
1150
1151 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1152 self.assertIn(
1153 ctx.maximum_version,
1154 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1155 )
1156
1157 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1158 self.assertIn(
1159 ctx.minimum_version,
1160 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1161 )
1162
1163 with self.assertRaises(ValueError):
1164 ctx.minimum_version = 42
1165
1166 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1167
1168 self.assertEqual(
1169 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1170 )
1171 self.assertEqual(
1172 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1173 )
1174 with self.assertRaises(ValueError):
1175 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1176 with self.assertRaises(ValueError):
1177 ctx.maximum_version = ssl.TLSVersion.TLSv1
1178
1179
Christian Heimes2427b502013-11-23 11:24:32 +01001180 @unittest.skipUnless(have_verify_flags(),
1181 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001182 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001183 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001184 # default value
1185 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1186 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001187 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1188 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1189 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1190 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1191 ctx.verify_flags = ssl.VERIFY_DEFAULT
1192 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1193 # supports any value
1194 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1195 self.assertEqual(ctx.verify_flags,
1196 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1197 with self.assertRaises(TypeError):
1198 ctx.verify_flags = None
1199
Antoine Pitrou152efa22010-05-16 18:19:27 +00001200 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001201 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001202 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001203 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001204 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1205 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001206 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001207 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001208 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001209 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001210 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001211 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001212 ctx.load_cert_chain(EMPTYCERT)
1213 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001214 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001215 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1216 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1217 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001218 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001219 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001220 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001221 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001222 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001223 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1224 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001225 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001226 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001227 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001228 # Password protected key and cert
1229 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1231 ctx.load_cert_chain(CERTFILE_PROTECTED,
1232 password=bytearray(KEY_PASSWORD.encode()))
1233 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1234 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1235 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1236 bytearray(KEY_PASSWORD.encode()))
1237 with self.assertRaisesRegex(TypeError, "should be a string"):
1238 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1239 with self.assertRaises(ssl.SSLError):
1240 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1241 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1242 # openssl has a fixed limit on the password buffer.
1243 # PEM_BUFSIZE is generally set to 1kb.
1244 # Return a string larger than this.
1245 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1246 # Password callback
1247 def getpass_unicode():
1248 return KEY_PASSWORD
1249 def getpass_bytes():
1250 return KEY_PASSWORD.encode()
1251 def getpass_bytearray():
1252 return bytearray(KEY_PASSWORD.encode())
1253 def getpass_badpass():
1254 return "badpass"
1255 def getpass_huge():
1256 return b'a' * (1024 * 1024)
1257 def getpass_bad_type():
1258 return 9
1259 def getpass_exception():
1260 raise Exception('getpass error')
1261 class GetPassCallable:
1262 def __call__(self):
1263 return KEY_PASSWORD
1264 def getpass(self):
1265 return KEY_PASSWORD
1266 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1268 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1269 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1270 ctx.load_cert_chain(CERTFILE_PROTECTED,
1271 password=GetPassCallable().getpass)
1272 with self.assertRaises(ssl.SSLError):
1273 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1274 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1275 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1276 with self.assertRaisesRegex(TypeError, "must return a string"):
1277 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1278 with self.assertRaisesRegex(Exception, "getpass error"):
1279 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1280 # Make sure the password function isn't called if it isn't needed
1281 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001282
1283 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001284 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001285 ctx.load_verify_locations(CERTFILE)
1286 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1287 ctx.load_verify_locations(BYTES_CERTFILE)
1288 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1289 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001290 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001291 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001292 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001293 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001294 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001295 ctx.load_verify_locations(BADCERT)
1296 ctx.load_verify_locations(CERTFILE, CAPATH)
1297 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1298
Victor Stinner80f75e62011-01-29 11:31:20 +00001299 # Issue #10989: crash if the second argument type is invalid
1300 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1301
Christian Heimesefff7062013-11-21 03:35:02 +01001302 def test_load_verify_cadata(self):
1303 # test cadata
1304 with open(CAFILE_CACERT) as f:
1305 cacert_pem = f.read()
1306 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1307 with open(CAFILE_NEURONIO) as f:
1308 neuronio_pem = f.read()
1309 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1310
1311 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001312 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001313 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1314 ctx.load_verify_locations(cadata=cacert_pem)
1315 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1316 ctx.load_verify_locations(cadata=neuronio_pem)
1317 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1318 # cert already in hash table
1319 ctx.load_verify_locations(cadata=neuronio_pem)
1320 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1321
1322 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001323 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001324 combined = "\n".join((cacert_pem, neuronio_pem))
1325 ctx.load_verify_locations(cadata=combined)
1326 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1327
1328 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001329 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001330 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1331 neuronio_pem, "tail"]
1332 ctx.load_verify_locations(cadata="\n".join(combined))
1333 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1334
1335 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001336 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001337 ctx.load_verify_locations(cadata=cacert_der)
1338 ctx.load_verify_locations(cadata=neuronio_der)
1339 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1340 # cert already in hash table
1341 ctx.load_verify_locations(cadata=cacert_der)
1342 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1343
1344 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001345 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001346 combined = b"".join((cacert_der, neuronio_der))
1347 ctx.load_verify_locations(cadata=combined)
1348 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1349
1350 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001351 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001352 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1353
1354 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1355 ctx.load_verify_locations(cadata="broken")
1356 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1357 ctx.load_verify_locations(cadata=b"broken")
1358
1359
Paul Monsonf3550692019-06-19 13:09:54 -07001360 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001361 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001362 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001363 ctx.load_dh_params(DHFILE)
1364 if os.name != 'nt':
1365 ctx.load_dh_params(BYTES_DHFILE)
1366 self.assertRaises(TypeError, ctx.load_dh_params)
1367 self.assertRaises(TypeError, ctx.load_dh_params, None)
1368 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001369 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001370 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001371 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001372 ctx.load_dh_params(CERTFILE)
1373
Antoine Pitroub0182c82010-10-12 20:09:02 +00001374 def test_session_stats(self):
1375 for proto in PROTOCOLS:
1376 ctx = ssl.SSLContext(proto)
1377 self.assertEqual(ctx.session_stats(), {
1378 'number': 0,
1379 'connect': 0,
1380 'connect_good': 0,
1381 'connect_renegotiate': 0,
1382 'accept': 0,
1383 'accept_good': 0,
1384 'accept_renegotiate': 0,
1385 'hits': 0,
1386 'misses': 0,
1387 'timeouts': 0,
1388 'cache_full': 0,
1389 })
1390
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001391 def test_set_default_verify_paths(self):
1392 # There's not much we can do to test that it acts as expected,
1393 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001394 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001395 ctx.set_default_verify_paths()
1396
Antoine Pitrou501da612011-12-21 09:27:41 +01001397 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001398 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001399 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001400 ctx.set_ecdh_curve("prime256v1")
1401 ctx.set_ecdh_curve(b"prime256v1")
1402 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1403 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1404 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1405 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1406
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001407 @needs_sni
1408 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001409 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001410
1411 # set_servername_callback expects a callable, or None
1412 self.assertRaises(TypeError, ctx.set_servername_callback)
1413 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1414 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1415 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1416
1417 def dummycallback(sock, servername, ctx):
1418 pass
1419 ctx.set_servername_callback(None)
1420 ctx.set_servername_callback(dummycallback)
1421
1422 @needs_sni
1423 def test_sni_callback_refcycle(self):
1424 # Reference cycles through the servername callback are detected
1425 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001426 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001427 def dummycallback(sock, servername, ctx, cycle=ctx):
1428 pass
1429 ctx.set_servername_callback(dummycallback)
1430 wr = weakref.ref(ctx)
1431 del ctx, dummycallback
1432 gc.collect()
1433 self.assertIs(wr(), None)
1434
Christian Heimes9a5395a2013-06-17 15:44:12 +02001435 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001436 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001437 self.assertEqual(ctx.cert_store_stats(),
1438 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1439 ctx.load_cert_chain(CERTFILE)
1440 self.assertEqual(ctx.cert_store_stats(),
1441 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1442 ctx.load_verify_locations(CERTFILE)
1443 self.assertEqual(ctx.cert_store_stats(),
1444 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001445 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001446 self.assertEqual(ctx.cert_store_stats(),
1447 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1448
1449 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001450 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001451 self.assertEqual(ctx.get_ca_certs(), [])
1452 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1453 ctx.load_verify_locations(CERTFILE)
1454 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001455 # but CAFILE_CACERT is a CA cert
1456 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001457 self.assertEqual(ctx.get_ca_certs(),
1458 [{'issuer': ((('organizationName', 'Root CA'),),
1459 (('organizationalUnitName', 'http://www.cacert.org'),),
1460 (('commonName', 'CA Cert Signing Authority'),),
1461 (('emailAddress', 'support@cacert.org'),)),
1462 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1463 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1464 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001465 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001466 'subject': ((('organizationName', 'Root CA'),),
1467 (('organizationalUnitName', 'http://www.cacert.org'),),
1468 (('commonName', 'CA Cert Signing Authority'),),
1469 (('emailAddress', 'support@cacert.org'),)),
1470 'version': 3}])
1471
Martin Panterb55f8b72016-01-14 12:53:56 +00001472 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001473 pem = f.read()
1474 der = ssl.PEM_cert_to_DER_cert(pem)
1475 self.assertEqual(ctx.get_ca_certs(True), [der])
1476
Christian Heimes72d28502013-11-23 13:56:58 +01001477 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001479 ctx.load_default_certs()
1480
Christian Heimesa170fa12017-09-15 20:27:30 +02001481 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001482 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1483 ctx.load_default_certs()
1484
Christian Heimesa170fa12017-09-15 20:27:30 +02001485 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001486 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1487
Christian Heimesa170fa12017-09-15 20:27:30 +02001488 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001489 self.assertRaises(TypeError, ctx.load_default_certs, None)
1490 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1491
Benjamin Peterson91244e02014-10-03 18:17:15 -04001492 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001493 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001494 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001495 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001496 with support.EnvironmentVarGuard() as env:
1497 env["SSL_CERT_DIR"] = CAPATH
1498 env["SSL_CERT_FILE"] = CERTFILE
1499 ctx.load_default_certs()
1500 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1501
Benjamin Peterson91244e02014-10-03 18:17:15 -04001502 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001503 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001504 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001505 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001506 ctx.load_default_certs()
1507 stats = ctx.cert_store_stats()
1508
Christian Heimesa170fa12017-09-15 20:27:30 +02001509 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001510 with support.EnvironmentVarGuard() as env:
1511 env["SSL_CERT_DIR"] = CAPATH
1512 env["SSL_CERT_FILE"] = CERTFILE
1513 ctx.load_default_certs()
1514 stats["x509"] += 1
1515 self.assertEqual(ctx.cert_store_stats(), stats)
1516
Christian Heimes358cfd42016-09-10 22:43:48 +02001517 def _assert_context_options(self, ctx):
1518 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1519 if OP_NO_COMPRESSION != 0:
1520 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1521 OP_NO_COMPRESSION)
1522 if OP_SINGLE_DH_USE != 0:
1523 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1524 OP_SINGLE_DH_USE)
1525 if OP_SINGLE_ECDH_USE != 0:
1526 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1527 OP_SINGLE_ECDH_USE)
1528 if OP_CIPHER_SERVER_PREFERENCE != 0:
1529 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1530 OP_CIPHER_SERVER_PREFERENCE)
1531
Christian Heimes4c05b472013-11-23 15:58:30 +01001532 def test_create_default_context(self):
1533 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001534
Christian Heimesa170fa12017-09-15 20:27:30 +02001535 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001536 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001537 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001538 self._assert_context_options(ctx)
1539
Christian Heimes4c05b472013-11-23 15:58:30 +01001540 with open(SIGNING_CA) as f:
1541 cadata = f.read()
1542 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1543 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001544 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001545 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001546 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001547
1548 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001549 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001550 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001551 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001552
Christian Heimes67986f92013-11-23 22:43:47 +01001553 def test__create_stdlib_context(self):
1554 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001555 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001556 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001557 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001558 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001559
1560 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1561 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1562 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001563 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001564
1565 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001566 cert_reqs=ssl.CERT_REQUIRED,
1567 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001568 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1569 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001570 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001571 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001572
1573 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001574 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001575 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001576 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001577
Christian Heimes1aa9a752013-12-02 02:41:19 +01001578 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001579 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001580 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001581 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001582
Christian Heimese82c0342017-09-15 20:29:57 +02001583 # Auto set CERT_REQUIRED
1584 ctx.check_hostname = True
1585 self.assertTrue(ctx.check_hostname)
1586 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1587 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001588 ctx.verify_mode = ssl.CERT_REQUIRED
1589 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001590 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001591
Christian Heimese82c0342017-09-15 20:29:57 +02001592 # Changing verify_mode does not affect check_hostname
1593 ctx.check_hostname = False
1594 ctx.verify_mode = ssl.CERT_NONE
1595 ctx.check_hostname = False
1596 self.assertFalse(ctx.check_hostname)
1597 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1598 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001599 ctx.check_hostname = True
1600 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001601 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1602
1603 ctx.check_hostname = False
1604 ctx.verify_mode = ssl.CERT_OPTIONAL
1605 ctx.check_hostname = False
1606 self.assertFalse(ctx.check_hostname)
1607 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1608 # keep CERT_OPTIONAL
1609 ctx.check_hostname = True
1610 self.assertTrue(ctx.check_hostname)
1611 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001612
1613 # Cannot set CERT_NONE with check_hostname enabled
1614 with self.assertRaises(ValueError):
1615 ctx.verify_mode = ssl.CERT_NONE
1616 ctx.check_hostname = False
1617 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001618 ctx.verify_mode = ssl.CERT_NONE
1619 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001620
Christian Heimes5fe668c2016-09-12 00:01:11 +02001621 def test_context_client_server(self):
1622 # PROTOCOL_TLS_CLIENT has sane defaults
1623 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1624 self.assertTrue(ctx.check_hostname)
1625 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1626
1627 # PROTOCOL_TLS_SERVER has different but also sane defaults
1628 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1629 self.assertFalse(ctx.check_hostname)
1630 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1631
Christian Heimes4df60f12017-09-15 20:26:05 +02001632 def test_context_custom_class(self):
1633 class MySSLSocket(ssl.SSLSocket):
1634 pass
1635
1636 class MySSLObject(ssl.SSLObject):
1637 pass
1638
1639 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1640 ctx.sslsocket_class = MySSLSocket
1641 ctx.sslobject_class = MySSLObject
1642
1643 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1644 self.assertIsInstance(sock, MySSLSocket)
1645 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1646 self.assertIsInstance(obj, MySSLObject)
1647
Christian Heimes78c7d522019-06-03 21:00:10 +02001648 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1649 def test_num_tickest(self):
1650 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1651 self.assertEqual(ctx.num_tickets, 2)
1652 ctx.num_tickets = 1
1653 self.assertEqual(ctx.num_tickets, 1)
1654 ctx.num_tickets = 0
1655 self.assertEqual(ctx.num_tickets, 0)
1656 with self.assertRaises(ValueError):
1657 ctx.num_tickets = -1
1658 with self.assertRaises(TypeError):
1659 ctx.num_tickets = None
1660
1661 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1662 self.assertEqual(ctx.num_tickets, 2)
1663 with self.assertRaises(ValueError):
1664 ctx.num_tickets = 1
1665
Antoine Pitrou152efa22010-05-16 18:19:27 +00001666
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001667class SSLErrorTests(unittest.TestCase):
1668
1669 def test_str(self):
1670 # The str() of a SSLError doesn't include the errno
1671 e = ssl.SSLError(1, "foo")
1672 self.assertEqual(str(e), "foo")
1673 self.assertEqual(e.errno, 1)
1674 # Same for a subclass
1675 e = ssl.SSLZeroReturnError(1, "foo")
1676 self.assertEqual(str(e), "foo")
1677 self.assertEqual(e.errno, 1)
1678
Paul Monsonf3550692019-06-19 13:09:54 -07001679 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001680 def test_lib_reason(self):
1681 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001682 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001683 with self.assertRaises(ssl.SSLError) as cm:
1684 ctx.load_dh_params(CERTFILE)
1685 self.assertEqual(cm.exception.library, 'PEM')
1686 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1687 s = str(cm.exception)
1688 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1689
1690 def test_subclass(self):
1691 # Check that the appropriate SSLError subclass is raised
1692 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001693 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1694 ctx.check_hostname = False
1695 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001696 with socket.create_server(("127.0.0.1", 0)) as s:
1697 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001698 c.setblocking(False)
1699 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001700 with self.assertRaises(ssl.SSLWantReadError) as cm:
1701 c.do_handshake()
1702 s = str(cm.exception)
1703 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1704 # For compatibility
1705 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1706
1707
Christian Heimes61d478c2018-01-27 15:51:38 +01001708 def test_bad_server_hostname(self):
1709 ctx = ssl.create_default_context()
1710 with self.assertRaises(ValueError):
1711 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1712 server_hostname="")
1713 with self.assertRaises(ValueError):
1714 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1715 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001716 with self.assertRaises(TypeError):
1717 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1718 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001719
1720
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001721class MemoryBIOTests(unittest.TestCase):
1722
1723 def test_read_write(self):
1724 bio = ssl.MemoryBIO()
1725 bio.write(b'foo')
1726 self.assertEqual(bio.read(), b'foo')
1727 self.assertEqual(bio.read(), b'')
1728 bio.write(b'foo')
1729 bio.write(b'bar')
1730 self.assertEqual(bio.read(), b'foobar')
1731 self.assertEqual(bio.read(), b'')
1732 bio.write(b'baz')
1733 self.assertEqual(bio.read(2), b'ba')
1734 self.assertEqual(bio.read(1), b'z')
1735 self.assertEqual(bio.read(1), b'')
1736
1737 def test_eof(self):
1738 bio = ssl.MemoryBIO()
1739 self.assertFalse(bio.eof)
1740 self.assertEqual(bio.read(), b'')
1741 self.assertFalse(bio.eof)
1742 bio.write(b'foo')
1743 self.assertFalse(bio.eof)
1744 bio.write_eof()
1745 self.assertFalse(bio.eof)
1746 self.assertEqual(bio.read(2), b'fo')
1747 self.assertFalse(bio.eof)
1748 self.assertEqual(bio.read(1), b'o')
1749 self.assertTrue(bio.eof)
1750 self.assertEqual(bio.read(), b'')
1751 self.assertTrue(bio.eof)
1752
1753 def test_pending(self):
1754 bio = ssl.MemoryBIO()
1755 self.assertEqual(bio.pending, 0)
1756 bio.write(b'foo')
1757 self.assertEqual(bio.pending, 3)
1758 for i in range(3):
1759 bio.read(1)
1760 self.assertEqual(bio.pending, 3-i-1)
1761 for i in range(3):
1762 bio.write(b'x')
1763 self.assertEqual(bio.pending, i+1)
1764 bio.read()
1765 self.assertEqual(bio.pending, 0)
1766
1767 def test_buffer_types(self):
1768 bio = ssl.MemoryBIO()
1769 bio.write(b'foo')
1770 self.assertEqual(bio.read(), b'foo')
1771 bio.write(bytearray(b'bar'))
1772 self.assertEqual(bio.read(), b'bar')
1773 bio.write(memoryview(b'baz'))
1774 self.assertEqual(bio.read(), b'baz')
1775
1776 def test_error_types(self):
1777 bio = ssl.MemoryBIO()
1778 self.assertRaises(TypeError, bio.write, 'foo')
1779 self.assertRaises(TypeError, bio.write, None)
1780 self.assertRaises(TypeError, bio.write, True)
1781 self.assertRaises(TypeError, bio.write, 1)
1782
1783
Christian Heimes9d50ab52018-02-27 10:17:30 +01001784class SSLObjectTests(unittest.TestCase):
1785 def test_private_init(self):
1786 bio = ssl.MemoryBIO()
1787 with self.assertRaisesRegex(TypeError, "public constructor"):
1788 ssl.SSLObject(bio, bio)
1789
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001790 def test_unwrap(self):
1791 client_ctx, server_ctx, hostname = testing_context()
1792 c_in = ssl.MemoryBIO()
1793 c_out = ssl.MemoryBIO()
1794 s_in = ssl.MemoryBIO()
1795 s_out = ssl.MemoryBIO()
1796 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1797 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1798
1799 # Loop on the handshake for a bit to get it settled
1800 for _ in range(5):
1801 try:
1802 client.do_handshake()
1803 except ssl.SSLWantReadError:
1804 pass
1805 if c_out.pending:
1806 s_in.write(c_out.read())
1807 try:
1808 server.do_handshake()
1809 except ssl.SSLWantReadError:
1810 pass
1811 if s_out.pending:
1812 c_in.write(s_out.read())
1813 # Now the handshakes should be complete (don't raise WantReadError)
1814 client.do_handshake()
1815 server.do_handshake()
1816
1817 # Now if we unwrap one side unilaterally, it should send close-notify
1818 # and raise WantReadError:
1819 with self.assertRaises(ssl.SSLWantReadError):
1820 client.unwrap()
1821
1822 # But server.unwrap() does not raise, because it reads the client's
1823 # close-notify:
1824 s_in.write(c_out.read())
1825 server.unwrap()
1826
1827 # And now that the client gets the server's close-notify, it doesn't
1828 # raise either.
1829 c_in.write(s_out.read())
1830 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001831
Martin Panter3840b2a2016-03-27 01:53:46 +00001832class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001833 """Tests that connect to a simple server running in the background"""
1834
1835 def setUp(self):
1836 server = ThreadedEchoServer(SIGNED_CERTFILE)
1837 self.server_addr = (HOST, server.port)
1838 server.__enter__()
1839 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001840
Antoine Pitrou480a1242010-04-28 21:37:09 +00001841 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001842 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 cert_reqs=ssl.CERT_NONE) as s:
1844 s.connect(self.server_addr)
1845 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001846 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001847
Martin Panter3840b2a2016-03-27 01:53:46 +00001848 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001849 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001850 cert_reqs=ssl.CERT_REQUIRED,
1851 ca_certs=SIGNING_CA) as s:
1852 s.connect(self.server_addr)
1853 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001854 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001855
Martin Panter3840b2a2016-03-27 01:53:46 +00001856 def test_connect_fail(self):
1857 # This should fail because we have no verification certs. Connection
1858 # failure crashes ThreadedEchoServer, so run this in an independent
1859 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001860 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001861 cert_reqs=ssl.CERT_REQUIRED)
1862 self.addCleanup(s.close)
1863 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1864 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001865
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001866 def test_connect_ex(self):
1867 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001868 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001869 cert_reqs=ssl.CERT_REQUIRED,
1870 ca_certs=SIGNING_CA)
1871 self.addCleanup(s.close)
1872 self.assertEqual(0, s.connect_ex(self.server_addr))
1873 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001874
1875 def test_non_blocking_connect_ex(self):
1876 # Issue #11326: non-blocking connect_ex() should allow handshake
1877 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001878 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001879 cert_reqs=ssl.CERT_REQUIRED,
1880 ca_certs=SIGNING_CA,
1881 do_handshake_on_connect=False)
1882 self.addCleanup(s.close)
1883 s.setblocking(False)
1884 rc = s.connect_ex(self.server_addr)
1885 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1886 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1887 # Wait for connect to finish
1888 select.select([], [s], [], 5.0)
1889 # Non-blocking handshake
1890 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001891 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001892 s.do_handshake()
1893 break
1894 except ssl.SSLWantReadError:
1895 select.select([s], [], [], 5.0)
1896 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001897 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001898 # SSL established
1899 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001900
Antoine Pitrou152efa22010-05-16 18:19:27 +00001901 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001902 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001903 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001904 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1905 s.connect(self.server_addr)
1906 self.assertEqual({}, s.getpeercert())
1907 # Same with a server hostname
1908 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1909 server_hostname="dummy") as s:
1910 s.connect(self.server_addr)
1911 ctx.verify_mode = ssl.CERT_REQUIRED
1912 # This should succeed because we specify the root cert
1913 ctx.load_verify_locations(SIGNING_CA)
1914 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1915 s.connect(self.server_addr)
1916 cert = s.getpeercert()
1917 self.assertTrue(cert)
1918
1919 def test_connect_with_context_fail(self):
1920 # This should fail because we have no verification certs. Connection
1921 # failure crashes ThreadedEchoServer, so run this in an independent
1922 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001923 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 ctx.verify_mode = ssl.CERT_REQUIRED
1925 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1926 self.addCleanup(s.close)
1927 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1928 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001929
1930 def test_connect_capath(self):
1931 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001932 # NOTE: the subject hashing algorithm has been changed between
1933 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1934 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001935 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001936 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001937 ctx.verify_mode = ssl.CERT_REQUIRED
1938 ctx.load_verify_locations(capath=CAPATH)
1939 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1940 s.connect(self.server_addr)
1941 cert = s.getpeercert()
1942 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001943
Martin Panter3840b2a2016-03-27 01:53:46 +00001944 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001945 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 ctx.verify_mode = ssl.CERT_REQUIRED
1947 ctx.load_verify_locations(capath=BYTES_CAPATH)
1948 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1949 s.connect(self.server_addr)
1950 cert = s.getpeercert()
1951 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001952
Christian Heimesefff7062013-11-21 03:35:02 +01001953 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001954 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001955 pem = f.read()
1956 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001957 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001958 ctx.verify_mode = ssl.CERT_REQUIRED
1959 ctx.load_verify_locations(cadata=pem)
1960 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1961 s.connect(self.server_addr)
1962 cert = s.getpeercert()
1963 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001964
Martin Panter3840b2a2016-03-27 01:53:46 +00001965 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001966 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001967 ctx.verify_mode = ssl.CERT_REQUIRED
1968 ctx.load_verify_locations(cadata=der)
1969 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1970 s.connect(self.server_addr)
1971 cert = s.getpeercert()
1972 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001973
Antoine Pitroue3220242010-04-24 11:13:53 +00001974 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1975 def test_makefile_close(self):
1976 # Issue #5238: creating a file-like object with makefile() shouldn't
1977 # delay closing the underlying "real socket" (here tested with its
1978 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001979 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001980 ss.connect(self.server_addr)
1981 fd = ss.fileno()
1982 f = ss.makefile()
1983 f.close()
1984 # The fd is still open
1985 os.read(fd, 0)
1986 # Closing the SSL socket should close the fd too
1987 ss.close()
1988 gc.collect()
1989 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001990 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001992
Antoine Pitrou480a1242010-04-28 21:37:09 +00001993 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001994 s = socket.socket(socket.AF_INET)
1995 s.connect(self.server_addr)
1996 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001997 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001998 cert_reqs=ssl.CERT_NONE,
1999 do_handshake_on_connect=False)
2000 self.addCleanup(s.close)
2001 count = 0
2002 while True:
2003 try:
2004 count += 1
2005 s.do_handshake()
2006 break
2007 except ssl.SSLWantReadError:
2008 select.select([s], [], [])
2009 except ssl.SSLWantWriteError:
2010 select.select([], [s], [])
2011 if support.verbose:
2012 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002013
Antoine Pitrou480a1242010-04-28 21:37:09 +00002014 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002015 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002016
Martin Panter3840b2a2016-03-27 01:53:46 +00002017 def test_get_server_certificate_fail(self):
2018 # Connection failure crashes ThreadedEchoServer, so run this in an
2019 # independent test method
2020 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002021
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002022 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002023 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002024 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2025 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002026 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002027 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2028 s.connect(self.server_addr)
2029 # Error checking can happen at instantiation or when connecting
2030 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2031 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002032 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002033 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2034 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002035
Christian Heimes9a5395a2013-06-17 15:44:12 +02002036 def test_get_ca_certs_capath(self):
2037 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002038 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002039 ctx.load_verify_locations(capath=CAPATH)
2040 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002041 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2042 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002043 s.connect(self.server_addr)
2044 cert = s.getpeercert()
2045 self.assertTrue(cert)
2046 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002047
Christian Heimes575596e2013-12-15 21:49:17 +01002048 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002049 def test_context_setget(self):
2050 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002051 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2052 ctx1.load_verify_locations(capath=CAPATH)
2053 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2054 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002055 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002056 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002057 ss.connect(self.server_addr)
2058 self.assertIs(ss.context, ctx1)
2059 self.assertIs(ss._sslobj.context, ctx1)
2060 ss.context = ctx2
2061 self.assertIs(ss.context, ctx2)
2062 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002063
2064 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2065 # A simple IO loop. Call func(*args) depending on the error we get
2066 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2067 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002068 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002069 count = 0
2070 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002071 if time.monotonic() > deadline:
2072 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002073 errno = None
2074 count += 1
2075 try:
2076 ret = func(*args)
2077 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002078 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002079 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002080 raise
2081 errno = e.errno
2082 # Get any data from the outgoing BIO irrespective of any error, and
2083 # send it to the socket.
2084 buf = outgoing.read()
2085 sock.sendall(buf)
2086 # If there's no error, we're done. For WANT_READ, we need to get
2087 # data from the socket and put it in the incoming BIO.
2088 if errno is None:
2089 break
2090 elif errno == ssl.SSL_ERROR_WANT_READ:
2091 buf = sock.recv(32768)
2092 if buf:
2093 incoming.write(buf)
2094 else:
2095 incoming.write_eof()
2096 if support.verbose:
2097 sys.stdout.write("Needed %d calls to complete %s().\n"
2098 % (count, func.__name__))
2099 return ret
2100
Martin Panter3840b2a2016-03-27 01:53:46 +00002101 def test_bio_handshake(self):
2102 sock = socket.socket(socket.AF_INET)
2103 self.addCleanup(sock.close)
2104 sock.connect(self.server_addr)
2105 incoming = ssl.MemoryBIO()
2106 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002107 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2108 self.assertTrue(ctx.check_hostname)
2109 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002110 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002111 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2112 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002113 self.assertIs(sslobj._sslobj.owner, sslobj)
2114 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002115 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002116 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002117 self.assertRaises(ValueError, sslobj.getpeercert)
2118 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2119 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2120 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2121 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002122 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002123 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002124 self.assertTrue(sslobj.getpeercert())
2125 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2126 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2127 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002128 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002129 except ssl.SSLSyscallError:
2130 # If the server shuts down the TCP connection without sending a
2131 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2132 pass
2133 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2134
2135 def test_bio_read_write_data(self):
2136 sock = socket.socket(socket.AF_INET)
2137 self.addCleanup(sock.close)
2138 sock.connect(self.server_addr)
2139 incoming = ssl.MemoryBIO()
2140 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002141 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002142 ctx.verify_mode = ssl.CERT_NONE
2143 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2144 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2145 req = b'FOO\n'
2146 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2147 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2148 self.assertEqual(buf, b'foo\n')
2149 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002150
2151
Martin Panter3840b2a2016-03-27 01:53:46 +00002152class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002153
Martin Panter3840b2a2016-03-27 01:53:46 +00002154 def test_timeout_connect_ex(self):
2155 # Issue #12065: on a timeout, connect_ex() should return the original
2156 # errno (mimicking the behaviour of non-SSL sockets).
2157 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002158 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002159 cert_reqs=ssl.CERT_REQUIRED,
2160 do_handshake_on_connect=False)
2161 self.addCleanup(s.close)
2162 s.settimeout(0.0000001)
2163 rc = s.connect_ex((REMOTE_HOST, 443))
2164 if rc == 0:
2165 self.skipTest("REMOTE_HOST responded too quickly")
2166 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2167
2168 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2169 def test_get_server_certificate_ipv6(self):
2170 with support.transient_internet('ipv6.google.com'):
2171 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2172 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2173
Martin Panter3840b2a2016-03-27 01:53:46 +00002174
2175def _test_get_server_certificate(test, host, port, cert=None):
2176 pem = ssl.get_server_certificate((host, port))
2177 if not pem:
2178 test.fail("No server certificate on %s:%s!" % (host, port))
2179
2180 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2181 if not pem:
2182 test.fail("No server certificate on %s:%s!" % (host, port))
2183 if support.verbose:
2184 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2185
2186def _test_get_server_certificate_fail(test, host, port):
2187 try:
2188 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2189 except ssl.SSLError as x:
2190 #should fail
2191 if support.verbose:
2192 sys.stdout.write("%s\n" % x)
2193 else:
2194 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2195
2196
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002197from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002198
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002199class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002200
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002201 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002202
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002203 """A mildly complicated class, because we want it to work both
2204 with and without the SSL wrapper around the socket connection, so
2205 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002206
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002207 def __init__(self, server, connsock, addr):
2208 self.server = server
2209 self.running = False
2210 self.sock = connsock
2211 self.addr = addr
2212 self.sock.setblocking(1)
2213 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002214 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002215 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002216
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002217 def wrap_conn(self):
2218 try:
2219 self.sslconn = self.server.context.wrap_socket(
2220 self.sock, server_side=True)
2221 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2222 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002223 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002224 # We treat ConnectionResetError as though it were an
2225 # SSLError - OpenSSL on Ubuntu abruptly closes the
2226 # connection when asked to use an unsupported protocol.
2227 #
Christian Heimes529525f2018-05-23 22:24:45 +02002228 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2229 # tries to send session tickets after handshake.
2230 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002231 #
2232 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2233 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002234 self.server.conn_errors.append(str(e))
2235 if self.server.chatty:
2236 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2237 self.running = False
2238 self.close()
2239 return False
2240 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002241 # OSError may occur with wrong protocols, e.g. both
2242 # sides use PROTOCOL_TLS_SERVER.
2243 #
2244 # XXX Various errors can have happened here, for example
2245 # a mismatching protocol version, an invalid certificate,
2246 # or a low-level bug. This should be made more discriminating.
2247 #
2248 # bpo-31323: Store the exception as string to prevent
2249 # a reference leak: server -> conn_errors -> exception
2250 # -> traceback -> self (ConnectionHandler) -> server
2251 self.server.conn_errors.append(str(e))
2252 if self.server.chatty:
2253 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2254 self.running = False
2255 self.server.stop()
2256 self.close()
2257 return False
2258 else:
2259 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2260 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2261 cert = self.sslconn.getpeercert()
2262 if support.verbose and self.server.chatty:
2263 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2264 cert_binary = self.sslconn.getpeercert(True)
2265 if support.verbose and self.server.chatty:
2266 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2267 cipher = self.sslconn.cipher()
2268 if support.verbose and self.server.chatty:
2269 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2270 sys.stdout.write(" server: selected protocol is now "
2271 + str(self.sslconn.selected_npn_protocol()) + "\n")
2272 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002273
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002274 def read(self):
2275 if self.sslconn:
2276 return self.sslconn.read()
2277 else:
2278 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002279
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002280 def write(self, bytes):
2281 if self.sslconn:
2282 return self.sslconn.write(bytes)
2283 else:
2284 return self.sock.send(bytes)
2285
2286 def close(self):
2287 if self.sslconn:
2288 self.sslconn.close()
2289 else:
2290 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002291
Antoine Pitrou480a1242010-04-28 21:37:09 +00002292 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002293 self.running = True
2294 if not self.server.starttls_server:
2295 if not self.wrap_conn():
2296 return
2297 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002298 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002299 msg = self.read()
2300 stripped = msg.strip()
2301 if not stripped:
2302 # eof, so quit this handler
2303 self.running = False
2304 try:
2305 self.sock = self.sslconn.unwrap()
2306 except OSError:
2307 # Many tests shut the TCP connection down
2308 # without an SSL shutdown. This causes
2309 # unwrap() to raise OSError with errno=0!
2310 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002311 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002312 self.sslconn = None
2313 self.close()
2314 elif stripped == b'over':
2315 if support.verbose and self.server.connectionchatty:
2316 sys.stdout.write(" server: client closed connection\n")
2317 self.close()
2318 return
2319 elif (self.server.starttls_server and
2320 stripped == b'STARTTLS'):
2321 if support.verbose and self.server.connectionchatty:
2322 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2323 self.write(b"OK\n")
2324 if not self.wrap_conn():
2325 return
2326 elif (self.server.starttls_server and self.sslconn
2327 and stripped == b'ENDTLS'):
2328 if support.verbose and self.server.connectionchatty:
2329 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2330 self.write(b"OK\n")
2331 self.sock = self.sslconn.unwrap()
2332 self.sslconn = None
2333 if support.verbose and self.server.connectionchatty:
2334 sys.stdout.write(" server: connection is now unencrypted...\n")
2335 elif stripped == b'CB tls-unique':
2336 if support.verbose and self.server.connectionchatty:
2337 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2338 data = self.sslconn.get_channel_binding("tls-unique")
2339 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002340 elif stripped == b'PHA':
2341 if support.verbose and self.server.connectionchatty:
2342 sys.stdout.write(" server: initiating post handshake auth\n")
2343 try:
2344 self.sslconn.verify_client_post_handshake()
2345 except ssl.SSLError as e:
2346 self.write(repr(e).encode("us-ascii") + b"\n")
2347 else:
2348 self.write(b"OK\n")
2349 elif stripped == b'HASCERT':
2350 if self.sslconn.getpeercert() is not None:
2351 self.write(b'TRUE\n')
2352 else:
2353 self.write(b'FALSE\n')
2354 elif stripped == b'GETCERT':
2355 cert = self.sslconn.getpeercert()
2356 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002357 else:
2358 if (support.verbose and
2359 self.server.connectionchatty):
2360 ctype = (self.sslconn and "encrypted") or "unencrypted"
2361 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2362 % (msg, ctype, msg.lower(), ctype))
2363 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002364 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002365 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2366 # when connection is not shut down gracefully.
2367 if self.server.chatty and support.verbose:
2368 sys.stdout.write(
2369 " Connection reset by peer: {}\n".format(
2370 self.addr)
2371 )
2372 self.close()
2373 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002374 except ssl.SSLError as err:
2375 # On Windows sometimes test_pha_required_nocert receives the
2376 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2377 # before the 'tlsv13 alert certificate required' exception.
2378 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2379 # is received test_pha_required_nocert fails with ConnectionResetError
2380 # because the underlying socket is closed
2381 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2382 if self.server.chatty and support.verbose:
2383 sys.stdout.write(err.args[1])
2384 # test_pha_required_nocert is expecting this exception
2385 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002386 except OSError:
2387 if self.server.chatty:
2388 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002389 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002390 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002391
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002392 # normally, we'd just stop here, but for the test
2393 # harness, we want to stop the server
2394 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002395
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002396 def __init__(self, certificate=None, ssl_version=None,
2397 certreqs=None, cacerts=None,
2398 chatty=True, connectionchatty=False, starttls_server=False,
2399 npn_protocols=None, alpn_protocols=None,
2400 ciphers=None, context=None):
2401 if context:
2402 self.context = context
2403 else:
2404 self.context = ssl.SSLContext(ssl_version
2405 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002406 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002407 self.context.verify_mode = (certreqs if certreqs is not None
2408 else ssl.CERT_NONE)
2409 if cacerts:
2410 self.context.load_verify_locations(cacerts)
2411 if certificate:
2412 self.context.load_cert_chain(certificate)
2413 if npn_protocols:
2414 self.context.set_npn_protocols(npn_protocols)
2415 if alpn_protocols:
2416 self.context.set_alpn_protocols(alpn_protocols)
2417 if ciphers:
2418 self.context.set_ciphers(ciphers)
2419 self.chatty = chatty
2420 self.connectionchatty = connectionchatty
2421 self.starttls_server = starttls_server
2422 self.sock = socket.socket()
2423 self.port = support.bind_port(self.sock)
2424 self.flag = None
2425 self.active = False
2426 self.selected_npn_protocols = []
2427 self.selected_alpn_protocols = []
2428 self.shared_ciphers = []
2429 self.conn_errors = []
2430 threading.Thread.__init__(self)
2431 self.daemon = True
2432
2433 def __enter__(self):
2434 self.start(threading.Event())
2435 self.flag.wait()
2436 return self
2437
2438 def __exit__(self, *args):
2439 self.stop()
2440 self.join()
2441
2442 def start(self, flag=None):
2443 self.flag = flag
2444 threading.Thread.start(self)
2445
2446 def run(self):
2447 self.sock.settimeout(0.05)
2448 self.sock.listen()
2449 self.active = True
2450 if self.flag:
2451 # signal an event
2452 self.flag.set()
2453 while self.active:
2454 try:
2455 newconn, connaddr = self.sock.accept()
2456 if support.verbose and self.chatty:
2457 sys.stdout.write(' server: new connection from '
2458 + repr(connaddr) + '\n')
2459 handler = self.ConnectionHandler(self, newconn, connaddr)
2460 handler.start()
2461 handler.join()
2462 except socket.timeout:
2463 pass
2464 except KeyboardInterrupt:
2465 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002466 except BaseException as e:
2467 if support.verbose and self.chatty:
2468 sys.stdout.write(
2469 ' connection handling failed: ' + repr(e) + '\n')
2470
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 self.sock.close()
2472
2473 def stop(self):
2474 self.active = False
2475
2476class AsyncoreEchoServer(threading.Thread):
2477
2478 # this one's based on asyncore.dispatcher
2479
2480 class EchoServer (asyncore.dispatcher):
2481
2482 class ConnectionHandler(asyncore.dispatcher_with_send):
2483
2484 def __init__(self, conn, certfile):
2485 self.socket = test_wrap_socket(conn, server_side=True,
2486 certfile=certfile,
2487 do_handshake_on_connect=False)
2488 asyncore.dispatcher_with_send.__init__(self, self.socket)
2489 self._ssl_accepting = True
2490 self._do_ssl_handshake()
2491
2492 def readable(self):
2493 if isinstance(self.socket, ssl.SSLSocket):
2494 while self.socket.pending() > 0:
2495 self.handle_read_event()
2496 return True
2497
2498 def _do_ssl_handshake(self):
2499 try:
2500 self.socket.do_handshake()
2501 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2502 return
2503 except ssl.SSLEOFError:
2504 return self.handle_close()
2505 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002506 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002507 except OSError as err:
2508 if err.args[0] == errno.ECONNABORTED:
2509 return self.handle_close()
2510 else:
2511 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002512
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002513 def handle_read(self):
2514 if self._ssl_accepting:
2515 self._do_ssl_handshake()
2516 else:
2517 data = self.recv(1024)
2518 if support.verbose:
2519 sys.stdout.write(" server: read %s from client\n" % repr(data))
2520 if not data:
2521 self.close()
2522 else:
2523 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002524
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002525 def handle_close(self):
2526 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002527 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002528 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002529
2530 def handle_error(self):
2531 raise
2532
Trent Nelson78520002008-04-10 20:54:35 +00002533 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002534 self.certfile = certfile
2535 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2536 self.port = support.bind_port(sock, '')
2537 asyncore.dispatcher.__init__(self, sock)
2538 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002539
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002540 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002541 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002542 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2543 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002544
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002545 def handle_error(self):
2546 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002547
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002548 def __init__(self, certfile):
2549 self.flag = None
2550 self.active = False
2551 self.server = self.EchoServer(certfile)
2552 self.port = self.server.port
2553 threading.Thread.__init__(self)
2554 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002555
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002556 def __str__(self):
2557 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002558
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002559 def __enter__(self):
2560 self.start(threading.Event())
2561 self.flag.wait()
2562 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002563
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002564 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002565 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002566 sys.stdout.write(" cleanup: stopping server.\n")
2567 self.stop()
2568 if support.verbose:
2569 sys.stdout.write(" cleanup: joining server thread.\n")
2570 self.join()
2571 if support.verbose:
2572 sys.stdout.write(" cleanup: successfully joined.\n")
2573 # make sure that ConnectionHandler is removed from socket_map
2574 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002575
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002576 def start (self, flag=None):
2577 self.flag = flag
2578 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002579
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002580 def run(self):
2581 self.active = True
2582 if self.flag:
2583 self.flag.set()
2584 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002585 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 asyncore.loop(1)
2587 except:
2588 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002589
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590 def stop(self):
2591 self.active = False
2592 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002593
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594def server_params_test(client_context, server_context, indata=b"FOO\n",
2595 chatty=True, connectionchatty=False, sni_name=None,
2596 session=None):
2597 """
2598 Launch a server, connect a client to it and try various reads
2599 and writes.
2600 """
2601 stats = {}
2602 server = ThreadedEchoServer(context=server_context,
2603 chatty=chatty,
2604 connectionchatty=False)
2605 with server:
2606 with client_context.wrap_socket(socket.socket(),
2607 server_hostname=sni_name, session=session) as s:
2608 s.connect((HOST, server.port))
2609 for arg in [indata, bytearray(indata), memoryview(indata)]:
2610 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002611 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002612 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002613 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002614 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002615 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002616 if connectionchatty:
2617 if support.verbose:
2618 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002619 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002620 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002621 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2622 % (outdata[:20], len(outdata),
2623 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002624 s.write(b"over\n")
2625 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002626 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002627 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002628 stats.update({
2629 'compression': s.compression(),
2630 'cipher': s.cipher(),
2631 'peercert': s.getpeercert(),
2632 'client_alpn_protocol': s.selected_alpn_protocol(),
2633 'client_npn_protocol': s.selected_npn_protocol(),
2634 'version': s.version(),
2635 'session_reused': s.session_reused,
2636 'session': s.session,
2637 })
2638 s.close()
2639 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2640 stats['server_npn_protocols'] = server.selected_npn_protocols
2641 stats['server_shared_ciphers'] = server.shared_ciphers
2642 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002643
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002644def try_protocol_combo(server_protocol, client_protocol, expect_success,
2645 certsreqs=None, server_options=0, client_options=0):
2646 """
2647 Try to SSL-connect using *client_protocol* to *server_protocol*.
2648 If *expect_success* is true, assert that the connection succeeds,
2649 if it's false, assert that the connection fails.
2650 Also, if *expect_success* is a string, assert that it is the protocol
2651 version actually used by the connection.
2652 """
2653 if certsreqs is None:
2654 certsreqs = ssl.CERT_NONE
2655 certtype = {
2656 ssl.CERT_NONE: "CERT_NONE",
2657 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2658 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2659 }[certsreqs]
2660 if support.verbose:
2661 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2662 sys.stdout.write(formatstr %
2663 (ssl.get_protocol_name(client_protocol),
2664 ssl.get_protocol_name(server_protocol),
2665 certtype))
2666 client_context = ssl.SSLContext(client_protocol)
2667 client_context.options |= client_options
2668 server_context = ssl.SSLContext(server_protocol)
2669 server_context.options |= server_options
2670
Victor Stinner3ef63442019-02-19 18:06:03 +01002671 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2672 if (min_version is not None
2673 # SSLContext.minimum_version is only available on recent OpenSSL
2674 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2675 and hasattr(server_context, 'minimum_version')
2676 and server_protocol == ssl.PROTOCOL_TLS
2677 and server_context.minimum_version > min_version):
2678 # If OpenSSL configuration is strict and requires more recent TLS
2679 # version, we have to change the minimum to test old TLS versions.
2680 server_context.minimum_version = min_version
2681
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002682 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2683 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2684 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002685 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002686 client_context.set_ciphers("ALL")
2687
2688 for ctx in (client_context, server_context):
2689 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002690 ctx.load_cert_chain(SIGNED_CERTFILE)
2691 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002692 try:
2693 stats = server_params_test(client_context, server_context,
2694 chatty=False, connectionchatty=False)
2695 # Protocol mismatch can result in either an SSLError, or a
2696 # "Connection reset by peer" error.
2697 except ssl.SSLError:
2698 if expect_success:
2699 raise
2700 except OSError as e:
2701 if expect_success or e.errno != errno.ECONNRESET:
2702 raise
2703 else:
2704 if not expect_success:
2705 raise AssertionError(
2706 "Client protocol %s succeeded with server protocol %s!"
2707 % (ssl.get_protocol_name(client_protocol),
2708 ssl.get_protocol_name(server_protocol)))
2709 elif (expect_success is not True
2710 and expect_success != stats['version']):
2711 raise AssertionError("version mismatch: expected %r, got %r"
2712 % (expect_success, stats['version']))
2713
2714
2715class ThreadedTests(unittest.TestCase):
2716
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002717 def test_echo(self):
2718 """Basic test of an SSL client connecting to a server"""
2719 if support.verbose:
2720 sys.stdout.write("\n")
2721 for protocol in PROTOCOLS:
2722 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2723 continue
2724 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2725 context = ssl.SSLContext(protocol)
2726 context.load_cert_chain(CERTFILE)
2727 server_params_test(context, context,
2728 chatty=True, connectionchatty=True)
2729
Christian Heimesa170fa12017-09-15 20:27:30 +02002730 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002731
2732 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2733 server_params_test(client_context=client_context,
2734 server_context=server_context,
2735 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002736 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002737
2738 client_context.check_hostname = False
2739 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2740 with self.assertRaises(ssl.SSLError) as e:
2741 server_params_test(client_context=server_context,
2742 server_context=client_context,
2743 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002744 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002745 self.assertIn('called a function you should not call',
2746 str(e.exception))
2747
2748 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2749 with self.assertRaises(ssl.SSLError) as e:
2750 server_params_test(client_context=server_context,
2751 server_context=server_context,
2752 chatty=True, connectionchatty=True)
2753 self.assertIn('called a function you should not call',
2754 str(e.exception))
2755
2756 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2757 with self.assertRaises(ssl.SSLError) as e:
2758 server_params_test(client_context=server_context,
2759 server_context=client_context,
2760 chatty=True, connectionchatty=True)
2761 self.assertIn('called a function you should not call',
2762 str(e.exception))
2763
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002764 def test_getpeercert(self):
2765 if support.verbose:
2766 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002767
2768 client_context, server_context, hostname = testing_context()
2769 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002770 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002771 with client_context.wrap_socket(socket.socket(),
2772 do_handshake_on_connect=False,
2773 server_hostname=hostname) as s:
2774 s.connect((HOST, server.port))
2775 # getpeercert() raise ValueError while the handshake isn't
2776 # done.
2777 with self.assertRaises(ValueError):
2778 s.getpeercert()
2779 s.do_handshake()
2780 cert = s.getpeercert()
2781 self.assertTrue(cert, "Can't get peer certificate.")
2782 cipher = s.cipher()
2783 if support.verbose:
2784 sys.stdout.write(pprint.pformat(cert) + '\n')
2785 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2786 if 'subject' not in cert:
2787 self.fail("No subject field in certificate: %s." %
2788 pprint.pformat(cert))
2789 if ((('organizationName', 'Python Software Foundation'),)
2790 not in cert['subject']):
2791 self.fail(
2792 "Missing or invalid 'organizationName' field in certificate subject; "
2793 "should be 'Python Software Foundation'.")
2794 self.assertIn('notBefore', cert)
2795 self.assertIn('notAfter', cert)
2796 before = ssl.cert_time_to_seconds(cert['notBefore'])
2797 after = ssl.cert_time_to_seconds(cert['notAfter'])
2798 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002799
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002800 @unittest.skipUnless(have_verify_flags(),
2801 "verify_flags need OpenSSL > 0.9.8")
2802 def test_crl_check(self):
2803 if support.verbose:
2804 sys.stdout.write("\n")
2805
Christian Heimesa170fa12017-09-15 20:27:30 +02002806 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002807
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002808 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002809 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002810
2811 # VERIFY_DEFAULT should pass
2812 server = ThreadedEchoServer(context=server_context, chatty=True)
2813 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002814 with client_context.wrap_socket(socket.socket(),
2815 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002816 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002817 cert = s.getpeercert()
2818 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002819
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002820 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002821 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002822
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002823 server = ThreadedEchoServer(context=server_context, chatty=True)
2824 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002825 with client_context.wrap_socket(socket.socket(),
2826 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002827 with self.assertRaisesRegex(ssl.SSLError,
2828 "certificate verify failed"):
2829 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002830
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002831 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002832 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002833
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002834 server = ThreadedEchoServer(context=server_context, chatty=True)
2835 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002836 with client_context.wrap_socket(socket.socket(),
2837 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002838 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002839 cert = s.getpeercert()
2840 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002841
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002842 def test_check_hostname(self):
2843 if support.verbose:
2844 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002845
Christian Heimesa170fa12017-09-15 20:27:30 +02002846 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002847
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002848 # correct hostname should verify
2849 server = ThreadedEchoServer(context=server_context, chatty=True)
2850 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002851 with client_context.wrap_socket(socket.socket(),
2852 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002853 s.connect((HOST, server.port))
2854 cert = s.getpeercert()
2855 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002856
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002857 # incorrect hostname should raise an exception
2858 server = ThreadedEchoServer(context=server_context, chatty=True)
2859 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002860 with client_context.wrap_socket(socket.socket(),
2861 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002862 with self.assertRaisesRegex(
2863 ssl.CertificateError,
2864 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002865 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002866
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002867 # missing server_hostname arg should cause an exception, too
2868 server = ThreadedEchoServer(context=server_context, chatty=True)
2869 with server:
2870 with socket.socket() as s:
2871 with self.assertRaisesRegex(ValueError,
2872 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002873 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002874
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002875 def test_ecc_cert(self):
2876 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2877 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002878 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002879 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2880
2881 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2882 # load ECC cert
2883 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2884
2885 # correct hostname should verify
2886 server = ThreadedEchoServer(context=server_context, chatty=True)
2887 with server:
2888 with client_context.wrap_socket(socket.socket(),
2889 server_hostname=hostname) as s:
2890 s.connect((HOST, server.port))
2891 cert = s.getpeercert()
2892 self.assertTrue(cert, "Can't get peer certificate.")
2893 cipher = s.cipher()[0].split('-')
2894 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2895
2896 def test_dual_rsa_ecc(self):
2897 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2898 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002899 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2900 # algorithms.
2901 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002902 # only ECDSA certs
2903 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2904 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2905
2906 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2907 # load ECC and RSA key/cert pairs
2908 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2909 server_context.load_cert_chain(SIGNED_CERTFILE)
2910
2911 # correct hostname should verify
2912 server = ThreadedEchoServer(context=server_context, chatty=True)
2913 with server:
2914 with client_context.wrap_socket(socket.socket(),
2915 server_hostname=hostname) as s:
2916 s.connect((HOST, server.port))
2917 cert = s.getpeercert()
2918 self.assertTrue(cert, "Can't get peer certificate.")
2919 cipher = s.cipher()[0].split('-')
2920 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2921
Christian Heimes66e57422018-01-29 14:25:13 +01002922 def test_check_hostname_idn(self):
2923 if support.verbose:
2924 sys.stdout.write("\n")
2925
Christian Heimes11a14932018-02-24 02:35:08 +01002926 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002927 server_context.load_cert_chain(IDNSANSFILE)
2928
Christian Heimes11a14932018-02-24 02:35:08 +01002929 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002930 context.verify_mode = ssl.CERT_REQUIRED
2931 context.check_hostname = True
2932 context.load_verify_locations(SIGNING_CA)
2933
2934 # correct hostname should verify, when specified in several
2935 # different ways
2936 idn_hostnames = [
2937 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002938 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002939 ('xn--knig-5qa.idn.pythontest.net',
2940 'xn--knig-5qa.idn.pythontest.net'),
2941 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002942 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002943
2944 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002945 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002946 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2947 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2948 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002949 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2950
2951 # ('königsgäßchen.idna2008.pythontest.net',
2952 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2953 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2954 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2955 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2956 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2957
Christian Heimes66e57422018-01-29 14:25:13 +01002958 ]
2959 for server_hostname, expected_hostname in idn_hostnames:
2960 server = ThreadedEchoServer(context=server_context, chatty=True)
2961 with server:
2962 with context.wrap_socket(socket.socket(),
2963 server_hostname=server_hostname) as s:
2964 self.assertEqual(s.server_hostname, expected_hostname)
2965 s.connect((HOST, server.port))
2966 cert = s.getpeercert()
2967 self.assertEqual(s.server_hostname, expected_hostname)
2968 self.assertTrue(cert, "Can't get peer certificate.")
2969
Christian Heimes66e57422018-01-29 14:25:13 +01002970 # incorrect hostname should raise an exception
2971 server = ThreadedEchoServer(context=server_context, chatty=True)
2972 with server:
2973 with context.wrap_socket(socket.socket(),
2974 server_hostname="python.example.org") as s:
2975 with self.assertRaises(ssl.CertificateError):
2976 s.connect((HOST, server.port))
2977
Christian Heimes529525f2018-05-23 22:24:45 +02002978 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002979 """Connecting when the server rejects the client's certificate
2980
2981 Launch a server with CERT_REQUIRED, and check that trying to
2982 connect to it with a wrong client certificate fails.
2983 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002984 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002985 # load client cert that is not signed by trusted CA
2986 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002987 # require TLS client authentication
2988 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002989 # TLS 1.3 has different handshake
2990 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002991
2992 server = ThreadedEchoServer(
2993 context=server_context, chatty=True, connectionchatty=True,
2994 )
2995
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002996 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002997 client_context.wrap_socket(socket.socket(),
2998 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002999 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003000 # Expect either an SSL error about the server rejecting
3001 # the connection, or a low-level connection reset (which
3002 # sometimes happens on Windows)
3003 s.connect((HOST, server.port))
3004 except ssl.SSLError as e:
3005 if support.verbose:
3006 sys.stdout.write("\nSSLError is %r\n" % e)
3007 except OSError as e:
3008 if e.errno != errno.ECONNRESET:
3009 raise
3010 if support.verbose:
3011 sys.stdout.write("\nsocket.error is %r\n" % e)
3012 else:
3013 self.fail("Use of invalid cert should have failed!")
3014
Christian Heimes529525f2018-05-23 22:24:45 +02003015 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
3016 def test_wrong_cert_tls13(self):
3017 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003018 # load client cert that is not signed by trusted CA
3019 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003020 server_context.verify_mode = ssl.CERT_REQUIRED
3021 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3022 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3023
3024 server = ThreadedEchoServer(
3025 context=server_context, chatty=True, connectionchatty=True,
3026 )
3027 with server, \
3028 client_context.wrap_socket(socket.socket(),
3029 server_hostname=hostname) as s:
3030 # TLS 1.3 perform client cert exchange after handshake
3031 s.connect((HOST, server.port))
3032 try:
3033 s.write(b'data')
3034 s.read(4)
3035 except ssl.SSLError as e:
3036 if support.verbose:
3037 sys.stdout.write("\nSSLError is %r\n" % e)
3038 except OSError as e:
3039 if e.errno != errno.ECONNRESET:
3040 raise
3041 if support.verbose:
3042 sys.stdout.write("\nsocket.error is %r\n" % e)
3043 else:
3044 self.fail("Use of invalid cert should have failed!")
3045
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003046 def test_rude_shutdown(self):
3047 """A brutal shutdown of an SSL server should raise an OSError
3048 in the client when attempting handshake.
3049 """
3050 listener_ready = threading.Event()
3051 listener_gone = threading.Event()
3052
3053 s = socket.socket()
3054 port = support.bind_port(s, HOST)
3055
3056 # `listener` runs in a thread. It sits in an accept() until
3057 # the main thread connects. Then it rudely closes the socket,
3058 # and sets Event `listener_gone` to let the main thread know
3059 # the socket is gone.
3060 def listener():
3061 s.listen()
3062 listener_ready.set()
3063 newsock, addr = s.accept()
3064 newsock.close()
3065 s.close()
3066 listener_gone.set()
3067
3068 def connector():
3069 listener_ready.wait()
3070 with socket.socket() as c:
3071 c.connect((HOST, port))
3072 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003073 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003074 ssl_sock = test_wrap_socket(c)
3075 except OSError:
3076 pass
3077 else:
3078 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003079
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003080 t = threading.Thread(target=listener)
3081 t.start()
3082 try:
3083 connector()
3084 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003085 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003086
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003087 def test_ssl_cert_verify_error(self):
3088 if support.verbose:
3089 sys.stdout.write("\n")
3090
3091 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3092 server_context.load_cert_chain(SIGNED_CERTFILE)
3093
3094 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3095
3096 server = ThreadedEchoServer(context=server_context, chatty=True)
3097 with server:
3098 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003099 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003100 try:
3101 s.connect((HOST, server.port))
3102 except ssl.SSLError as e:
3103 msg = 'unable to get local issuer certificate'
3104 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3105 self.assertEqual(e.verify_code, 20)
3106 self.assertEqual(e.verify_message, msg)
3107 self.assertIn(msg, repr(e))
3108 self.assertIn('certificate verify failed', repr(e))
3109
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003110 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3111 "OpenSSL is compiled without SSLv2 support")
3112 def test_protocol_sslv2(self):
3113 """Connecting to an SSLv2 server with various client options"""
3114 if support.verbose:
3115 sys.stdout.write("\n")
3116 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3117 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3118 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003119 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003120 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3121 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3122 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3123 # SSLv23 client with specific SSL options
3124 if no_sslv2_implies_sslv3_hello():
3125 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003126 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003127 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003128 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003129 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003130 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003131 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003132
Christian Heimesa170fa12017-09-15 20:27:30 +02003133 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003134 """Connecting to an SSLv23 server with various client options"""
3135 if support.verbose:
3136 sys.stdout.write("\n")
3137 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003138 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003139 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003140 except OSError as x:
3141 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3142 if support.verbose:
3143 sys.stdout.write(
3144 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3145 % str(x))
3146 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003147 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3148 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3149 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003150
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003151 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003152 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3153 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3154 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003155
3156 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003157 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3158 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3159 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003160
3161 # Server with specific SSL options
3162 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003163 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003164 server_options=ssl.OP_NO_SSLv3)
3165 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003166 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003167 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003168 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003169 server_options=ssl.OP_NO_TLSv1)
3170
3171
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003172 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3173 "OpenSSL is compiled without SSLv3 support")
3174 def test_protocol_sslv3(self):
3175 """Connecting to an SSLv3 server with various client options"""
3176 if support.verbose:
3177 sys.stdout.write("\n")
3178 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3179 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3180 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3181 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3182 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003183 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003184 client_options=ssl.OP_NO_SSLv3)
3185 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3186 if no_sslv2_implies_sslv3_hello():
3187 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003188 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003189 False, client_options=ssl.OP_NO_SSLv2)
3190
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003191 def test_protocol_tlsv1(self):
3192 """Connecting to a TLSv1 server with various client options"""
3193 if support.verbose:
3194 sys.stdout.write("\n")
3195 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3196 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3197 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3198 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3199 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3200 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3201 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003202 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003203 client_options=ssl.OP_NO_TLSv1)
3204
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003205 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3206 "TLS version 1.1 not supported.")
3207 def test_protocol_tlsv1_1(self):
3208 """Connecting to a TLSv1.1 server with various client options.
3209 Testing against older TLS versions."""
3210 if support.verbose:
3211 sys.stdout.write("\n")
3212 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3213 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3214 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3215 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3216 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003217 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003218 client_options=ssl.OP_NO_TLSv1_1)
3219
Christian Heimesa170fa12017-09-15 20:27:30 +02003220 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003221 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3222 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3223
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003224 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3225 "TLS version 1.2 not supported.")
3226 def test_protocol_tlsv1_2(self):
3227 """Connecting to a TLSv1.2 server with various client options.
3228 Testing against older TLS versions."""
3229 if support.verbose:
3230 sys.stdout.write("\n")
3231 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3232 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3233 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3234 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3235 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3236 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3237 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003238 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003239 client_options=ssl.OP_NO_TLSv1_2)
3240
Christian Heimesa170fa12017-09-15 20:27:30 +02003241 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003242 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3243 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3244 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3245 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3246
3247 def test_starttls(self):
3248 """Switching from clear text to encrypted and back again."""
3249 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3250
3251 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003252 starttls_server=True,
3253 chatty=True,
3254 connectionchatty=True)
3255 wrapped = False
3256 with server:
3257 s = socket.socket()
3258 s.setblocking(1)
3259 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003260 if support.verbose:
3261 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003262 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003263 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003264 sys.stdout.write(
3265 " client: sending %r...\n" % indata)
3266 if wrapped:
3267 conn.write(indata)
3268 outdata = conn.read()
3269 else:
3270 s.send(indata)
3271 outdata = s.recv(1024)
3272 msg = outdata.strip().lower()
3273 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3274 # STARTTLS ok, switch to secure mode
3275 if support.verbose:
3276 sys.stdout.write(
3277 " client: read %r from server, starting TLS...\n"
3278 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003279 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003280 wrapped = True
3281 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3282 # ENDTLS ok, switch back to clear text
3283 if support.verbose:
3284 sys.stdout.write(
3285 " client: read %r from server, ending TLS...\n"
3286 % msg)
3287 s = conn.unwrap()
3288 wrapped = False
3289 else:
3290 if support.verbose:
3291 sys.stdout.write(
3292 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003293 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003294 sys.stdout.write(" client: closing connection.\n")
3295 if wrapped:
3296 conn.write(b"over\n")
3297 else:
3298 s.send(b"over\n")
3299 if wrapped:
3300 conn.close()
3301 else:
3302 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003303
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003304 def test_socketserver(self):
3305 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003306 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003307 # try to connect
3308 if support.verbose:
3309 sys.stdout.write('\n')
3310 with open(CERTFILE, 'rb') as f:
3311 d1 = f.read()
3312 d2 = ''
3313 # now fetch the same data from the HTTPS server
3314 url = 'https://localhost:%d/%s' % (
3315 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003316 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003317 f = urllib.request.urlopen(url, context=context)
3318 try:
3319 dlen = f.info().get("content-length")
3320 if dlen and (int(dlen) > 0):
3321 d2 = f.read(int(dlen))
3322 if support.verbose:
3323 sys.stdout.write(
3324 " client: read %d bytes from remote server '%s'\n"
3325 % (len(d2), server))
3326 finally:
3327 f.close()
3328 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003329
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003330 def test_asyncore_server(self):
3331 """Check the example asyncore integration."""
3332 if support.verbose:
3333 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003334
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003335 indata = b"FOO\n"
3336 server = AsyncoreEchoServer(CERTFILE)
3337 with server:
3338 s = test_wrap_socket(socket.socket())
3339 s.connect(('127.0.0.1', server.port))
3340 if support.verbose:
3341 sys.stdout.write(
3342 " client: sending %r...\n" % indata)
3343 s.write(indata)
3344 outdata = s.read()
3345 if support.verbose:
3346 sys.stdout.write(" client: read %r\n" % outdata)
3347 if outdata != indata.lower():
3348 self.fail(
3349 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3350 % (outdata[:20], len(outdata),
3351 indata[:20].lower(), len(indata)))
3352 s.write(b"over\n")
3353 if support.verbose:
3354 sys.stdout.write(" client: closing connection.\n")
3355 s.close()
3356 if support.verbose:
3357 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003358
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003359 def test_recv_send(self):
3360 """Test recv(), send() and friends."""
3361 if support.verbose:
3362 sys.stdout.write("\n")
3363
3364 server = ThreadedEchoServer(CERTFILE,
3365 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003366 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003367 cacerts=CERTFILE,
3368 chatty=True,
3369 connectionchatty=False)
3370 with server:
3371 s = test_wrap_socket(socket.socket(),
3372 server_side=False,
3373 certfile=CERTFILE,
3374 ca_certs=CERTFILE,
3375 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003376 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003377 s.connect((HOST, server.port))
3378 # helper methods for standardising recv* method signatures
3379 def _recv_into():
3380 b = bytearray(b"\0"*100)
3381 count = s.recv_into(b)
3382 return b[:count]
3383
3384 def _recvfrom_into():
3385 b = bytearray(b"\0"*100)
3386 count, addr = s.recvfrom_into(b)
3387 return b[:count]
3388
3389 # (name, method, expect success?, *args, return value func)
3390 send_methods = [
3391 ('send', s.send, True, [], len),
3392 ('sendto', s.sendto, False, ["some.address"], len),
3393 ('sendall', s.sendall, True, [], lambda x: None),
3394 ]
3395 # (name, method, whether to expect success, *args)
3396 recv_methods = [
3397 ('recv', s.recv, True, []),
3398 ('recvfrom', s.recvfrom, False, ["some.address"]),
3399 ('recv_into', _recv_into, True, []),
3400 ('recvfrom_into', _recvfrom_into, False, []),
3401 ]
3402 data_prefix = "PREFIX_"
3403
3404 for (meth_name, send_meth, expect_success, args,
3405 ret_val_meth) in send_methods:
3406 indata = (data_prefix + meth_name).encode('ascii')
3407 try:
3408 ret = send_meth(indata, *args)
3409 msg = "sending with {}".format(meth_name)
3410 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3411 outdata = s.read()
3412 if outdata != indata.lower():
3413 self.fail(
3414 "While sending with <<{name:s}>> bad data "
3415 "<<{outdata:r}>> ({nout:d}) received; "
3416 "expected <<{indata:r}>> ({nin:d})\n".format(
3417 name=meth_name, outdata=outdata[:20],
3418 nout=len(outdata),
3419 indata=indata[:20], nin=len(indata)
3420 )
3421 )
3422 except ValueError as e:
3423 if expect_success:
3424 self.fail(
3425 "Failed to send with method <<{name:s}>>; "
3426 "expected to succeed.\n".format(name=meth_name)
3427 )
3428 if not str(e).startswith(meth_name):
3429 self.fail(
3430 "Method <<{name:s}>> failed with unexpected "
3431 "exception message: {exp:s}\n".format(
3432 name=meth_name, exp=e
3433 )
3434 )
3435
3436 for meth_name, recv_meth, expect_success, args in recv_methods:
3437 indata = (data_prefix + meth_name).encode('ascii')
3438 try:
3439 s.send(indata)
3440 outdata = recv_meth(*args)
3441 if outdata != indata.lower():
3442 self.fail(
3443 "While receiving with <<{name:s}>> bad data "
3444 "<<{outdata:r}>> ({nout:d}) received; "
3445 "expected <<{indata:r}>> ({nin:d})\n".format(
3446 name=meth_name, outdata=outdata[:20],
3447 nout=len(outdata),
3448 indata=indata[:20], nin=len(indata)
3449 )
3450 )
3451 except ValueError as e:
3452 if expect_success:
3453 self.fail(
3454 "Failed to receive with method <<{name:s}>>; "
3455 "expected to succeed.\n".format(name=meth_name)
3456 )
3457 if not str(e).startswith(meth_name):
3458 self.fail(
3459 "Method <<{name:s}>> failed with unexpected "
3460 "exception message: {exp:s}\n".format(
3461 name=meth_name, exp=e
3462 )
3463 )
3464 # consume data
3465 s.read()
3466
3467 # read(-1, buffer) is supported, even though read(-1) is not
3468 data = b"data"
3469 s.send(data)
3470 buffer = bytearray(len(data))
3471 self.assertEqual(s.read(-1, buffer), len(data))
3472 self.assertEqual(buffer, data)
3473
Christian Heimes888bbdc2017-09-07 14:18:21 -07003474 # sendall accepts bytes-like objects
3475 if ctypes is not None:
3476 ubyte = ctypes.c_ubyte * len(data)
3477 byteslike = ubyte.from_buffer_copy(data)
3478 s.sendall(byteslike)
3479 self.assertEqual(s.read(), data)
3480
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003481 # Make sure sendmsg et al are disallowed to avoid
3482 # inadvertent disclosure of data and/or corruption
3483 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003484 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003485 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3486 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3487 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003488 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003489 s.write(b"over\n")
3490
3491 self.assertRaises(ValueError, s.recv, -1)
3492 self.assertRaises(ValueError, s.read, -1)
3493
3494 s.close()
3495
3496 def test_recv_zero(self):
3497 server = ThreadedEchoServer(CERTFILE)
3498 server.__enter__()
3499 self.addCleanup(server.__exit__, None, None)
3500 s = socket.create_connection((HOST, server.port))
3501 self.addCleanup(s.close)
3502 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3503 self.addCleanup(s.close)
3504
3505 # recv/read(0) should return no data
3506 s.send(b"data")
3507 self.assertEqual(s.recv(0), b"")
3508 self.assertEqual(s.read(0), b"")
3509 self.assertEqual(s.read(), b"data")
3510
3511 # Should not block if the other end sends no data
3512 s.setblocking(False)
3513 self.assertEqual(s.recv(0), b"")
3514 self.assertEqual(s.recv_into(bytearray()), 0)
3515
3516 def test_nonblocking_send(self):
3517 server = ThreadedEchoServer(CERTFILE,
3518 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003519 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003520 cacerts=CERTFILE,
3521 chatty=True,
3522 connectionchatty=False)
3523 with server:
3524 s = test_wrap_socket(socket.socket(),
3525 server_side=False,
3526 certfile=CERTFILE,
3527 ca_certs=CERTFILE,
3528 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003529 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003530 s.connect((HOST, server.port))
3531 s.setblocking(False)
3532
3533 # If we keep sending data, at some point the buffers
3534 # will be full and the call will block
3535 buf = bytearray(8192)
3536 def fill_buffer():
3537 while True:
3538 s.send(buf)
3539 self.assertRaises((ssl.SSLWantWriteError,
3540 ssl.SSLWantReadError), fill_buffer)
3541
3542 # Now read all the output and discard it
3543 s.setblocking(True)
3544 s.close()
3545
3546 def test_handshake_timeout(self):
3547 # Issue #5103: SSL handshake must respect the socket timeout
3548 server = socket.socket(socket.AF_INET)
3549 host = "127.0.0.1"
3550 port = support.bind_port(server)
3551 started = threading.Event()
3552 finish = False
3553
3554 def serve():
3555 server.listen()
3556 started.set()
3557 conns = []
3558 while not finish:
3559 r, w, e = select.select([server], [], [], 0.1)
3560 if server in r:
3561 # Let the socket hang around rather than having
3562 # it closed by garbage collection.
3563 conns.append(server.accept()[0])
3564 for sock in conns:
3565 sock.close()
3566
3567 t = threading.Thread(target=serve)
3568 t.start()
3569 started.wait()
3570
3571 try:
3572 try:
3573 c = socket.socket(socket.AF_INET)
3574 c.settimeout(0.2)
3575 c.connect((host, port))
3576 # Will attempt handshake and time out
3577 self.assertRaisesRegex(socket.timeout, "timed out",
3578 test_wrap_socket, c)
3579 finally:
3580 c.close()
3581 try:
3582 c = socket.socket(socket.AF_INET)
3583 c = test_wrap_socket(c)
3584 c.settimeout(0.2)
3585 # Will attempt handshake and time out
3586 self.assertRaisesRegex(socket.timeout, "timed out",
3587 c.connect, (host, port))
3588 finally:
3589 c.close()
3590 finally:
3591 finish = True
3592 t.join()
3593 server.close()
3594
3595 def test_server_accept(self):
3596 # Issue #16357: accept() on a SSLSocket created through
3597 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003598 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003599 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003600 context.load_verify_locations(SIGNING_CA)
3601 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003602 server = socket.socket(socket.AF_INET)
3603 host = "127.0.0.1"
3604 port = support.bind_port(server)
3605 server = context.wrap_socket(server, server_side=True)
3606 self.assertTrue(server.server_side)
3607
3608 evt = threading.Event()
3609 remote = None
3610 peer = None
3611 def serve():
3612 nonlocal remote, peer
3613 server.listen()
3614 # Block on the accept and wait on the connection to close.
3615 evt.set()
3616 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003617 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003618
3619 t = threading.Thread(target=serve)
3620 t.start()
3621 # Client wait until server setup and perform a connect.
3622 evt.wait()
3623 client = context.wrap_socket(socket.socket())
3624 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003625 client.send(b'data')
3626 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003627 client_addr = client.getsockname()
3628 client.close()
3629 t.join()
3630 remote.close()
3631 server.close()
3632 # Sanity checks.
3633 self.assertIsInstance(remote, ssl.SSLSocket)
3634 self.assertEqual(peer, client_addr)
3635
3636 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003637 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003638 with context.wrap_socket(socket.socket()) as sock:
3639 with self.assertRaises(OSError) as cm:
3640 sock.getpeercert()
3641 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3642
3643 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003644 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003645 with context.wrap_socket(socket.socket()) as sock:
3646 with self.assertRaises(OSError) as cm:
3647 sock.do_handshake()
3648 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3649
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003650 def test_no_shared_ciphers(self):
3651 client_context, server_context, hostname = testing_context()
3652 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3653 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003654 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003655 client_context.set_ciphers("AES128")
3656 server_context.set_ciphers("AES256")
3657 with ThreadedEchoServer(context=server_context) as server:
3658 with client_context.wrap_socket(socket.socket(),
3659 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003660 with self.assertRaises(OSError):
3661 s.connect((HOST, server.port))
3662 self.assertIn("no shared cipher", server.conn_errors[0])
3663
3664 def test_version_basic(self):
3665 """
3666 Basic tests for SSLSocket.version().
3667 More tests are done in the test_protocol_*() methods.
3668 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003669 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3670 context.check_hostname = False
3671 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003672 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003673 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003674 chatty=False) as server:
3675 with context.wrap_socket(socket.socket()) as s:
3676 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003677 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003678 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003679 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003680 self.assertEqual(s.version(), 'TLSv1.3')
3681 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003682 self.assertEqual(s.version(), 'TLSv1.2')
3683 else: # 0.9.8 to 1.0.1
3684 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003685 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003686 self.assertIs(s.version(), None)
3687
Christian Heimescb5b68a2017-09-07 18:07:00 -07003688 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3689 "test requires TLSv1.3 enabled OpenSSL")
3690 def test_tls1_3(self):
3691 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3692 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003693 context.options |= (
3694 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3695 )
3696 with ThreadedEchoServer(context=context) as server:
3697 with context.wrap_socket(socket.socket()) as s:
3698 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003699 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003700 'TLS_AES_256_GCM_SHA384',
3701 'TLS_CHACHA20_POLY1305_SHA256',
3702 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003703 })
3704 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003705
Christian Heimes698dde12018-02-27 11:54:43 +01003706 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3707 "required OpenSSL 1.1.0g")
3708 def test_min_max_version(self):
3709 client_context, server_context, hostname = testing_context()
3710 # client TLSv1.0 to 1.2
3711 client_context.minimum_version = ssl.TLSVersion.TLSv1
3712 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3713 # server only TLSv1.2
3714 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3715 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3716
3717 with ThreadedEchoServer(context=server_context) as server:
3718 with client_context.wrap_socket(socket.socket(),
3719 server_hostname=hostname) as s:
3720 s.connect((HOST, server.port))
3721 self.assertEqual(s.version(), 'TLSv1.2')
3722
3723 # client 1.0 to 1.2, server 1.0 to 1.1
3724 server_context.minimum_version = ssl.TLSVersion.TLSv1
3725 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3726
3727 with ThreadedEchoServer(context=server_context) as server:
3728 with client_context.wrap_socket(socket.socket(),
3729 server_hostname=hostname) as s:
3730 s.connect((HOST, server.port))
3731 self.assertEqual(s.version(), 'TLSv1.1')
3732
3733 # client 1.0, server 1.2 (mismatch)
3734 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3735 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003736 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003737 client_context.maximum_version = ssl.TLSVersion.TLSv1
3738 with ThreadedEchoServer(context=server_context) as server:
3739 with client_context.wrap_socket(socket.socket(),
3740 server_hostname=hostname) as s:
3741 with self.assertRaises(ssl.SSLError) as e:
3742 s.connect((HOST, server.port))
3743 self.assertIn("alert", str(e.exception))
3744
3745
3746 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3747 "required OpenSSL 1.1.0g")
3748 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3749 def test_min_max_version_sslv3(self):
3750 client_context, server_context, hostname = testing_context()
3751 server_context.minimum_version = ssl.TLSVersion.SSLv3
3752 client_context.minimum_version = ssl.TLSVersion.SSLv3
3753 client_context.maximum_version = ssl.TLSVersion.SSLv3
3754 with ThreadedEchoServer(context=server_context) as server:
3755 with client_context.wrap_socket(socket.socket(),
3756 server_hostname=hostname) as s:
3757 s.connect((HOST, server.port))
3758 self.assertEqual(s.version(), 'SSLv3')
3759
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003760 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3761 def test_default_ecdh_curve(self):
3762 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3763 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003764 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003765 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003766 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3767 # cipher name.
3768 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3770 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3771 # our default cipher list should prefer ECDH-based ciphers
3772 # automatically.
3773 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3774 context.set_ciphers("ECCdraft:ECDH")
3775 with ThreadedEchoServer(context=context) as server:
3776 with context.wrap_socket(socket.socket()) as s:
3777 s.connect((HOST, server.port))
3778 self.assertIn("ECDH", s.cipher()[0])
3779
3780 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3781 "'tls-unique' channel binding not available")
3782 def test_tls_unique_channel_binding(self):
3783 """Test tls-unique channel binding."""
3784 if support.verbose:
3785 sys.stdout.write("\n")
3786
Christian Heimes05d9fe32018-02-27 08:55:39 +01003787 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003788
3789 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003790 chatty=True,
3791 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003792
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003793 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003794 with client_context.wrap_socket(
3795 socket.socket(),
3796 server_hostname=hostname) as s:
3797 s.connect((HOST, server.port))
3798 # get the data
3799 cb_data = s.get_channel_binding("tls-unique")
3800 if support.verbose:
3801 sys.stdout.write(
3802 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003803
Christian Heimes05d9fe32018-02-27 08:55:39 +01003804 # check if it is sane
3805 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003806 if s.version() == 'TLSv1.3':
3807 self.assertEqual(len(cb_data), 48)
3808 else:
3809 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003810
Christian Heimes05d9fe32018-02-27 08:55:39 +01003811 # and compare with the peers version
3812 s.write(b"CB tls-unique\n")
3813 peer_data_repr = s.read().strip()
3814 self.assertEqual(peer_data_repr,
3815 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003816
3817 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003818 with client_context.wrap_socket(
3819 socket.socket(),
3820 server_hostname=hostname) as s:
3821 s.connect((HOST, server.port))
3822 new_cb_data = s.get_channel_binding("tls-unique")
3823 if support.verbose:
3824 sys.stdout.write(
3825 "got another channel binding data: {0!r}\n".format(
3826 new_cb_data)
3827 )
3828 # is it really unique
3829 self.assertNotEqual(cb_data, new_cb_data)
3830 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003831 if s.version() == 'TLSv1.3':
3832 self.assertEqual(len(cb_data), 48)
3833 else:
3834 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003835 s.write(b"CB tls-unique\n")
3836 peer_data_repr = s.read().strip()
3837 self.assertEqual(peer_data_repr,
3838 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003839
3840 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003841 client_context, server_context, hostname = testing_context()
3842 stats = server_params_test(client_context, server_context,
3843 chatty=True, connectionchatty=True,
3844 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003845 if support.verbose:
3846 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3847 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3848
3849 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3850 "ssl.OP_NO_COMPRESSION needed for this test")
3851 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003852 client_context, server_context, hostname = testing_context()
3853 client_context.options |= ssl.OP_NO_COMPRESSION
3854 server_context.options |= ssl.OP_NO_COMPRESSION
3855 stats = server_params_test(client_context, server_context,
3856 chatty=True, connectionchatty=True,
3857 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003858 self.assertIs(stats['compression'], None)
3859
Paul Monsonf3550692019-06-19 13:09:54 -07003860 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003861 def test_dh_params(self):
3862 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003863 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003864 # test scenario needs TLS <= 1.2
3865 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003866 server_context.load_dh_params(DHFILE)
3867 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003868 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003869 stats = server_params_test(client_context, server_context,
3870 chatty=True, connectionchatty=True,
3871 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003872 cipher = stats["cipher"][0]
3873 parts = cipher.split("-")
3874 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3875 self.fail("Non-DH cipher: " + cipher[0])
3876
Christian Heimesb7b92252018-02-25 09:49:31 +01003877 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003878 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003879 def test_ecdh_curve(self):
3880 # server secp384r1, client auto
3881 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003882
Christian Heimesb7b92252018-02-25 09:49:31 +01003883 server_context.set_ecdh_curve("secp384r1")
3884 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3885 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3886 stats = server_params_test(client_context, server_context,
3887 chatty=True, connectionchatty=True,
3888 sni_name=hostname)
3889
3890 # server auto, client secp384r1
3891 client_context, server_context, hostname = testing_context()
3892 client_context.set_ecdh_curve("secp384r1")
3893 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3894 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3895 stats = server_params_test(client_context, server_context,
3896 chatty=True, connectionchatty=True,
3897 sni_name=hostname)
3898
3899 # server / client curve mismatch
3900 client_context, server_context, hostname = testing_context()
3901 client_context.set_ecdh_curve("prime256v1")
3902 server_context.set_ecdh_curve("secp384r1")
3903 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3904 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3905 try:
3906 stats = server_params_test(client_context, server_context,
3907 chatty=True, connectionchatty=True,
3908 sni_name=hostname)
3909 except ssl.SSLError:
3910 pass
3911 else:
3912 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003913 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003914 self.fail("mismatch curve did not fail")
3915
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003916 def test_selected_alpn_protocol(self):
3917 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003918 client_context, server_context, hostname = testing_context()
3919 stats = server_params_test(client_context, server_context,
3920 chatty=True, connectionchatty=True,
3921 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003922 self.assertIs(stats['client_alpn_protocol'], None)
3923
3924 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3925 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3926 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003927 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003928 server_context.set_alpn_protocols(['foo', 'bar'])
3929 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003930 chatty=True, connectionchatty=True,
3931 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003932 self.assertIs(stats['client_alpn_protocol'], None)
3933
3934 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3935 def test_alpn_protocols(self):
3936 server_protocols = ['foo', 'bar', 'milkshake']
3937 protocol_tests = [
3938 (['foo', 'bar'], 'foo'),
3939 (['bar', 'foo'], 'foo'),
3940 (['milkshake'], 'milkshake'),
3941 (['http/3.0', 'http/4.0'], None)
3942 ]
3943 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003944 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003945 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003946 client_context.set_alpn_protocols(client_protocols)
3947
3948 try:
3949 stats = server_params_test(client_context,
3950 server_context,
3951 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003952 connectionchatty=True,
3953 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003954 except ssl.SSLError as e:
3955 stats = e
3956
Christian Heimes05d9fe32018-02-27 08:55:39 +01003957 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003958 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3959 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3960 self.assertIsInstance(stats, ssl.SSLError)
3961 else:
3962 msg = "failed trying %s (s) and %s (c).\n" \
3963 "was expecting %s, but got %%s from the %%s" \
3964 % (str(server_protocols), str(client_protocols),
3965 str(expected))
3966 client_result = stats['client_alpn_protocol']
3967 self.assertEqual(client_result, expected,
3968 msg % (client_result, "client"))
3969 server_result = stats['server_alpn_protocols'][-1] \
3970 if len(stats['server_alpn_protocols']) else 'nothing'
3971 self.assertEqual(server_result, expected,
3972 msg % (server_result, "server"))
3973
3974 def test_selected_npn_protocol(self):
3975 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003976 client_context, server_context, hostname = testing_context()
3977 stats = server_params_test(client_context, server_context,
3978 chatty=True, connectionchatty=True,
3979 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003980 self.assertIs(stats['client_npn_protocol'], None)
3981
3982 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3983 def test_npn_protocols(self):
3984 server_protocols = ['http/1.1', 'spdy/2']
3985 protocol_tests = [
3986 (['http/1.1', 'spdy/2'], 'http/1.1'),
3987 (['spdy/2', 'http/1.1'], 'http/1.1'),
3988 (['spdy/2', 'test'], 'spdy/2'),
3989 (['abc', 'def'], 'abc')
3990 ]
3991 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003992 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003993 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003994 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003995 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003996 chatty=True, connectionchatty=True,
3997 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003998 msg = "failed trying %s (s) and %s (c).\n" \
3999 "was expecting %s, but got %%s from the %%s" \
4000 % (str(server_protocols), str(client_protocols),
4001 str(expected))
4002 client_result = stats['client_npn_protocol']
4003 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4004 server_result = stats['server_npn_protocols'][-1] \
4005 if len(stats['server_npn_protocols']) else 'nothing'
4006 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4007
4008 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004009 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004010 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004011 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004012 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004013 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004014 client_context.load_verify_locations(SIGNING_CA)
4015 return server_context, other_context, client_context
4016
4017 def check_common_name(self, stats, name):
4018 cert = stats['peercert']
4019 self.assertIn((('commonName', name),), cert['subject'])
4020
4021 @needs_sni
4022 def test_sni_callback(self):
4023 calls = []
4024 server_context, other_context, client_context = self.sni_contexts()
4025
Christian Heimesa170fa12017-09-15 20:27:30 +02004026 client_context.check_hostname = False
4027
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004028 def servername_cb(ssl_sock, server_name, initial_context):
4029 calls.append((server_name, initial_context))
4030 if server_name is not None:
4031 ssl_sock.context = other_context
4032 server_context.set_servername_callback(servername_cb)
4033
4034 stats = server_params_test(client_context, server_context,
4035 chatty=True,
4036 sni_name='supermessage')
4037 # The hostname was fetched properly, and the certificate was
4038 # changed for the connection.
4039 self.assertEqual(calls, [("supermessage", server_context)])
4040 # CERTFILE4 was selected
4041 self.check_common_name(stats, 'fakehostname')
4042
4043 calls = []
4044 # The callback is called with server_name=None
4045 stats = server_params_test(client_context, server_context,
4046 chatty=True,
4047 sni_name=None)
4048 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004049 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004050
4051 # Check disabling the callback
4052 calls = []
4053 server_context.set_servername_callback(None)
4054
4055 stats = server_params_test(client_context, server_context,
4056 chatty=True,
4057 sni_name='notfunny')
4058 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004059 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004060 self.assertEqual(calls, [])
4061
4062 @needs_sni
4063 def test_sni_callback_alert(self):
4064 # Returning a TLS alert is reflected to the connecting client
4065 server_context, other_context, client_context = self.sni_contexts()
4066
4067 def cb_returning_alert(ssl_sock, server_name, initial_context):
4068 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4069 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004070 with self.assertRaises(ssl.SSLError) as cm:
4071 stats = server_params_test(client_context, server_context,
4072 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004073 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004074 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004075
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004076 @needs_sni
4077 def test_sni_callback_raising(self):
4078 # Raising fails the connection with a TLS handshake failure alert.
4079 server_context, other_context, client_context = self.sni_contexts()
4080
4081 def cb_raising(ssl_sock, server_name, initial_context):
4082 1/0
4083 server_context.set_servername_callback(cb_raising)
4084
Victor Stinner00253502019-06-03 03:51:43 +02004085 with support.catch_unraisable_exception() as catch:
4086 with self.assertRaises(ssl.SSLError) as cm:
4087 stats = server_params_test(client_context, server_context,
4088 chatty=False,
4089 sni_name='supermessage')
4090
4091 self.assertEqual(cm.exception.reason,
4092 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4093 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004094
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004095 @needs_sni
4096 def test_sni_callback_wrong_return_type(self):
4097 # Returning the wrong return type terminates the TLS connection
4098 # with an internal error alert.
4099 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004100
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004101 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4102 return "foo"
4103 server_context.set_servername_callback(cb_wrong_return_type)
4104
Victor Stinner00253502019-06-03 03:51:43 +02004105 with support.catch_unraisable_exception() as catch:
4106 with self.assertRaises(ssl.SSLError) as cm:
4107 stats = server_params_test(client_context, server_context,
4108 chatty=False,
4109 sni_name='supermessage')
4110
4111
4112 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4113 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004114
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004115 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004116 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004117 client_context.set_ciphers("AES128:AES256")
4118 server_context.set_ciphers("AES256")
4119 expected_algs = [
4120 "AES256", "AES-256",
4121 # TLS 1.3 ciphers are always enabled
4122 "TLS_CHACHA20", "TLS_AES",
4123 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004124
Christian Heimesa170fa12017-09-15 20:27:30 +02004125 stats = server_params_test(client_context, server_context,
4126 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004127 ciphers = stats['server_shared_ciphers'][0]
4128 self.assertGreater(len(ciphers), 0)
4129 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004130 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004132
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004133 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004134 client_context, server_context, hostname = testing_context()
4135 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004136
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004137 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004138 s = client_context.wrap_socket(socket.socket(),
4139 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004140 s.connect((HOST, server.port))
4141 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004142
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004143 self.assertRaises(ValueError, s.read, 1024)
4144 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004145
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004146 def test_sendfile(self):
4147 TEST_DATA = b"x" * 512
4148 with open(support.TESTFN, 'wb') as f:
4149 f.write(TEST_DATA)
4150 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004151 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004152 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004153 context.load_verify_locations(SIGNING_CA)
4154 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004155 server = ThreadedEchoServer(context=context, chatty=False)
4156 with server:
4157 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004158 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004159 with open(support.TESTFN, 'rb') as file:
4160 s.sendfile(file)
4161 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004162
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004163 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004164 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004165 # TODO: sessions aren't compatible with TLSv1.3 yet
4166 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004167
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004168 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004169 stats = server_params_test(client_context, server_context,
4170 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004171 session = stats['session']
4172 self.assertTrue(session.id)
4173 self.assertGreater(session.time, 0)
4174 self.assertGreater(session.timeout, 0)
4175 self.assertTrue(session.has_ticket)
4176 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4177 self.assertGreater(session.ticket_lifetime_hint, 0)
4178 self.assertFalse(stats['session_reused'])
4179 sess_stat = server_context.session_stats()
4180 self.assertEqual(sess_stat['accept'], 1)
4181 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004182
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004183 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004184 stats = server_params_test(client_context, server_context,
4185 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004186 sess_stat = server_context.session_stats()
4187 self.assertEqual(sess_stat['accept'], 2)
4188 self.assertEqual(sess_stat['hits'], 1)
4189 self.assertTrue(stats['session_reused'])
4190 session2 = stats['session']
4191 self.assertEqual(session2.id, session.id)
4192 self.assertEqual(session2, session)
4193 self.assertIsNot(session2, session)
4194 self.assertGreaterEqual(session2.time, session.time)
4195 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004196
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004197 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004198 stats = server_params_test(client_context, server_context,
4199 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004200 self.assertFalse(stats['session_reused'])
4201 session3 = stats['session']
4202 self.assertNotEqual(session3.id, session.id)
4203 self.assertNotEqual(session3, session)
4204 sess_stat = server_context.session_stats()
4205 self.assertEqual(sess_stat['accept'], 3)
4206 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004207
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004208 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004209 stats = server_params_test(client_context, server_context,
4210 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004211 self.assertTrue(stats['session_reused'])
4212 session4 = stats['session']
4213 self.assertEqual(session4.id, session.id)
4214 self.assertEqual(session4, session)
4215 self.assertGreaterEqual(session4.time, session.time)
4216 self.assertGreaterEqual(session4.timeout, session.timeout)
4217 sess_stat = server_context.session_stats()
4218 self.assertEqual(sess_stat['accept'], 4)
4219 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004220
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004221 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004222 client_context, server_context, hostname = testing_context()
4223 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004224
Christian Heimes05d9fe32018-02-27 08:55:39 +01004225 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004226 client_context.options |= ssl.OP_NO_TLSv1_3
4227 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004228
Christian Heimesa170fa12017-09-15 20:27:30 +02004229 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004230 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004231 with client_context.wrap_socket(socket.socket(),
4232 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004233 # session is None before handshake
4234 self.assertEqual(s.session, None)
4235 self.assertEqual(s.session_reused, None)
4236 s.connect((HOST, server.port))
4237 session = s.session
4238 self.assertTrue(session)
4239 with self.assertRaises(TypeError) as e:
4240 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004241 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004242
Christian Heimesa170fa12017-09-15 20:27:30 +02004243 with client_context.wrap_socket(socket.socket(),
4244 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004245 s.connect((HOST, server.port))
4246 # cannot set session after handshake
4247 with self.assertRaises(ValueError) as e:
4248 s.session = session
4249 self.assertEqual(str(e.exception),
4250 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004251
Christian Heimesa170fa12017-09-15 20:27:30 +02004252 with client_context.wrap_socket(socket.socket(),
4253 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004254 # can set session before handshake and before the
4255 # connection was established
4256 s.session = session
4257 s.connect((HOST, server.port))
4258 self.assertEqual(s.session.id, session.id)
4259 self.assertEqual(s.session, session)
4260 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004261
Christian Heimesa170fa12017-09-15 20:27:30 +02004262 with client_context2.wrap_socket(socket.socket(),
4263 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004264 # cannot re-use session with a different SSLContext
4265 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004266 s.session = session
4267 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004268 self.assertEqual(str(e.exception),
4269 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004270
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004271
Christian Heimes9fb051f2018-09-23 08:32:31 +02004272@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4273class TestPostHandshakeAuth(unittest.TestCase):
4274 def test_pha_setter(self):
4275 protocols = [
4276 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4277 ]
4278 for protocol in protocols:
4279 ctx = ssl.SSLContext(protocol)
4280 self.assertEqual(ctx.post_handshake_auth, False)
4281
4282 ctx.post_handshake_auth = True
4283 self.assertEqual(ctx.post_handshake_auth, True)
4284
4285 ctx.verify_mode = ssl.CERT_REQUIRED
4286 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4287 self.assertEqual(ctx.post_handshake_auth, True)
4288
4289 ctx.post_handshake_auth = False
4290 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4291 self.assertEqual(ctx.post_handshake_auth, False)
4292
4293 ctx.verify_mode = ssl.CERT_OPTIONAL
4294 ctx.post_handshake_auth = True
4295 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4296 self.assertEqual(ctx.post_handshake_auth, True)
4297
4298 def test_pha_required(self):
4299 client_context, server_context, hostname = testing_context()
4300 server_context.post_handshake_auth = True
4301 server_context.verify_mode = ssl.CERT_REQUIRED
4302 client_context.post_handshake_auth = True
4303 client_context.load_cert_chain(SIGNED_CERTFILE)
4304
4305 server = ThreadedEchoServer(context=server_context, chatty=False)
4306 with server:
4307 with client_context.wrap_socket(socket.socket(),
4308 server_hostname=hostname) as s:
4309 s.connect((HOST, server.port))
4310 s.write(b'HASCERT')
4311 self.assertEqual(s.recv(1024), b'FALSE\n')
4312 s.write(b'PHA')
4313 self.assertEqual(s.recv(1024), b'OK\n')
4314 s.write(b'HASCERT')
4315 self.assertEqual(s.recv(1024), b'TRUE\n')
4316 # PHA method just returns true when cert is already available
4317 s.write(b'PHA')
4318 self.assertEqual(s.recv(1024), b'OK\n')
4319 s.write(b'GETCERT')
4320 cert_text = s.recv(4096).decode('us-ascii')
4321 self.assertIn('Python Software Foundation CA', cert_text)
4322
4323 def test_pha_required_nocert(self):
4324 client_context, server_context, hostname = testing_context()
4325 server_context.post_handshake_auth = True
4326 server_context.verify_mode = ssl.CERT_REQUIRED
4327 client_context.post_handshake_auth = True
4328
4329 server = ThreadedEchoServer(context=server_context, chatty=False)
4330 with server:
4331 with client_context.wrap_socket(socket.socket(),
4332 server_hostname=hostname) as s:
4333 s.connect((HOST, server.port))
4334 s.write(b'PHA')
4335 # receive CertificateRequest
4336 self.assertEqual(s.recv(1024), b'OK\n')
4337 # send empty Certificate + Finish
4338 s.write(b'HASCERT')
4339 # receive alert
4340 with self.assertRaisesRegex(
4341 ssl.SSLError,
4342 'tlsv13 alert certificate required'):
4343 s.recv(1024)
4344
4345 def test_pha_optional(self):
4346 if support.verbose:
4347 sys.stdout.write("\n")
4348
4349 client_context, server_context, hostname = testing_context()
4350 server_context.post_handshake_auth = True
4351 server_context.verify_mode = ssl.CERT_REQUIRED
4352 client_context.post_handshake_auth = True
4353 client_context.load_cert_chain(SIGNED_CERTFILE)
4354
4355 # check CERT_OPTIONAL
4356 server_context.verify_mode = ssl.CERT_OPTIONAL
4357 server = ThreadedEchoServer(context=server_context, chatty=False)
4358 with server:
4359 with client_context.wrap_socket(socket.socket(),
4360 server_hostname=hostname) as s:
4361 s.connect((HOST, server.port))
4362 s.write(b'HASCERT')
4363 self.assertEqual(s.recv(1024), b'FALSE\n')
4364 s.write(b'PHA')
4365 self.assertEqual(s.recv(1024), b'OK\n')
4366 s.write(b'HASCERT')
4367 self.assertEqual(s.recv(1024), b'TRUE\n')
4368
4369 def test_pha_optional_nocert(self):
4370 if support.verbose:
4371 sys.stdout.write("\n")
4372
4373 client_context, server_context, hostname = testing_context()
4374 server_context.post_handshake_auth = True
4375 server_context.verify_mode = ssl.CERT_OPTIONAL
4376 client_context.post_handshake_auth = True
4377
4378 server = ThreadedEchoServer(context=server_context, chatty=False)
4379 with server:
4380 with client_context.wrap_socket(socket.socket(),
4381 server_hostname=hostname) as s:
4382 s.connect((HOST, server.port))
4383 s.write(b'HASCERT')
4384 self.assertEqual(s.recv(1024), b'FALSE\n')
4385 s.write(b'PHA')
4386 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004387 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004388 s.write(b'HASCERT')
4389 self.assertEqual(s.recv(1024), b'FALSE\n')
4390
4391 def test_pha_no_pha_client(self):
4392 client_context, server_context, hostname = testing_context()
4393 server_context.post_handshake_auth = True
4394 server_context.verify_mode = ssl.CERT_REQUIRED
4395 client_context.load_cert_chain(SIGNED_CERTFILE)
4396
4397 server = ThreadedEchoServer(context=server_context, chatty=False)
4398 with server:
4399 with client_context.wrap_socket(socket.socket(),
4400 server_hostname=hostname) as s:
4401 s.connect((HOST, server.port))
4402 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4403 s.verify_client_post_handshake()
4404 s.write(b'PHA')
4405 self.assertIn(b'extension not received', s.recv(1024))
4406
4407 def test_pha_no_pha_server(self):
4408 # server doesn't have PHA enabled, cert is requested in handshake
4409 client_context, server_context, hostname = testing_context()
4410 server_context.verify_mode = ssl.CERT_REQUIRED
4411 client_context.post_handshake_auth = True
4412 client_context.load_cert_chain(SIGNED_CERTFILE)
4413
4414 server = ThreadedEchoServer(context=server_context, chatty=False)
4415 with server:
4416 with client_context.wrap_socket(socket.socket(),
4417 server_hostname=hostname) as s:
4418 s.connect((HOST, server.port))
4419 s.write(b'HASCERT')
4420 self.assertEqual(s.recv(1024), b'TRUE\n')
4421 # PHA doesn't fail if there is already a cert
4422 s.write(b'PHA')
4423 self.assertEqual(s.recv(1024), b'OK\n')
4424 s.write(b'HASCERT')
4425 self.assertEqual(s.recv(1024), b'TRUE\n')
4426
4427 def test_pha_not_tls13(self):
4428 # TLS 1.2
4429 client_context, server_context, hostname = testing_context()
4430 server_context.verify_mode = ssl.CERT_REQUIRED
4431 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4432 client_context.post_handshake_auth = True
4433 client_context.load_cert_chain(SIGNED_CERTFILE)
4434
4435 server = ThreadedEchoServer(context=server_context, chatty=False)
4436 with server:
4437 with client_context.wrap_socket(socket.socket(),
4438 server_hostname=hostname) as s:
4439 s.connect((HOST, server.port))
4440 # PHA fails for TLS != 1.3
4441 s.write(b'PHA')
4442 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4443
Christian Heimesf0f59302019-07-01 08:29:17 +02004444 def test_bpo37428_pha_cert_none(self):
4445 # verify that post_handshake_auth does not implicitly enable cert
4446 # validation.
4447 hostname = SIGNED_CERTFILE_HOSTNAME
4448 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4449 client_context.post_handshake_auth = True
4450 client_context.load_cert_chain(SIGNED_CERTFILE)
4451 # no cert validation and CA on client side
4452 client_context.check_hostname = False
4453 client_context.verify_mode = ssl.CERT_NONE
4454
4455 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4456 server_context.load_cert_chain(SIGNED_CERTFILE)
4457 server_context.load_verify_locations(SIGNING_CA)
4458 server_context.post_handshake_auth = True
4459 server_context.verify_mode = ssl.CERT_REQUIRED
4460
4461 server = ThreadedEchoServer(context=server_context, chatty=False)
4462 with server:
4463 with client_context.wrap_socket(socket.socket(),
4464 server_hostname=hostname) as s:
4465 s.connect((HOST, server.port))
4466 s.write(b'HASCERT')
4467 self.assertEqual(s.recv(1024), b'FALSE\n')
4468 s.write(b'PHA')
4469 self.assertEqual(s.recv(1024), b'OK\n')
4470 s.write(b'HASCERT')
4471 self.assertEqual(s.recv(1024), b'TRUE\n')
4472 # server cert has not been validated
4473 self.assertEqual(s.getpeercert(), {})
4474
Christian Heimes9fb051f2018-09-23 08:32:31 +02004475
Christian Heimesc7f70692019-05-31 11:44:05 +02004476HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4477requires_keylog = unittest.skipUnless(
4478 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4479
4480class TestSSLDebug(unittest.TestCase):
4481
4482 def keylog_lines(self, fname=support.TESTFN):
4483 with open(fname) as f:
4484 return len(list(f))
4485
4486 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004487 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004488 def test_keylog_defaults(self):
4489 self.addCleanup(support.unlink, support.TESTFN)
4490 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4491 self.assertEqual(ctx.keylog_filename, None)
4492
4493 self.assertFalse(os.path.isfile(support.TESTFN))
4494 ctx.keylog_filename = support.TESTFN
4495 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4496 self.assertTrue(os.path.isfile(support.TESTFN))
4497 self.assertEqual(self.keylog_lines(), 1)
4498
4499 ctx.keylog_filename = None
4500 self.assertEqual(ctx.keylog_filename, None)
4501
4502 with self.assertRaises((IsADirectoryError, PermissionError)):
4503 # Windows raises PermissionError
4504 ctx.keylog_filename = os.path.dirname(
4505 os.path.abspath(support.TESTFN))
4506
4507 with self.assertRaises(TypeError):
4508 ctx.keylog_filename = 1
4509
4510 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004511 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004512 def test_keylog_filename(self):
4513 self.addCleanup(support.unlink, support.TESTFN)
4514 client_context, server_context, hostname = testing_context()
4515
4516 client_context.keylog_filename = support.TESTFN
4517 server = ThreadedEchoServer(context=server_context, chatty=False)
4518 with server:
4519 with client_context.wrap_socket(socket.socket(),
4520 server_hostname=hostname) as s:
4521 s.connect((HOST, server.port))
4522 # header, 5 lines for TLS 1.3
4523 self.assertEqual(self.keylog_lines(), 6)
4524
4525 client_context.keylog_filename = None
4526 server_context.keylog_filename = support.TESTFN
4527 server = ThreadedEchoServer(context=server_context, chatty=False)
4528 with server:
4529 with client_context.wrap_socket(socket.socket(),
4530 server_hostname=hostname) as s:
4531 s.connect((HOST, server.port))
4532 self.assertGreaterEqual(self.keylog_lines(), 11)
4533
4534 client_context.keylog_filename = support.TESTFN
4535 server_context.keylog_filename = support.TESTFN
4536 server = ThreadedEchoServer(context=server_context, chatty=False)
4537 with server:
4538 with client_context.wrap_socket(socket.socket(),
4539 server_hostname=hostname) as s:
4540 s.connect((HOST, server.port))
4541 self.assertGreaterEqual(self.keylog_lines(), 21)
4542
4543 client_context.keylog_filename = None
4544 server_context.keylog_filename = None
4545
4546 @requires_keylog
4547 @unittest.skipIf(sys.flags.ignore_environment,
4548 "test is not compatible with ignore_environment")
Paul Monsonf3550692019-06-19 13:09:54 -07004549 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004550 def test_keylog_env(self):
4551 self.addCleanup(support.unlink, support.TESTFN)
4552 with unittest.mock.patch.dict(os.environ):
4553 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4554 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4555
4556 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4557 self.assertEqual(ctx.keylog_filename, None)
4558
4559 ctx = ssl.create_default_context()
4560 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4561
4562 ctx = ssl._create_stdlib_context()
4563 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4564
4565 def test_msg_callback(self):
4566 client_context, server_context, hostname = testing_context()
4567
4568 def msg_cb(conn, direction, version, content_type, msg_type, data):
4569 pass
4570
4571 self.assertIs(client_context._msg_callback, None)
4572 client_context._msg_callback = msg_cb
4573 self.assertIs(client_context._msg_callback, msg_cb)
4574 with self.assertRaises(TypeError):
4575 client_context._msg_callback = object()
4576
4577 def test_msg_callback_tls12(self):
4578 client_context, server_context, hostname = testing_context()
4579 client_context.options |= ssl.OP_NO_TLSv1_3
4580
4581 msg = []
4582
4583 def msg_cb(conn, direction, version, content_type, msg_type, data):
4584 self.assertIsInstance(conn, ssl.SSLSocket)
4585 self.assertIsInstance(data, bytes)
4586 self.assertIn(direction, {'read', 'write'})
4587 msg.append((direction, version, content_type, msg_type))
4588
4589 client_context._msg_callback = msg_cb
4590
4591 server = ThreadedEchoServer(context=server_context, chatty=False)
4592 with server:
4593 with client_context.wrap_socket(socket.socket(),
4594 server_hostname=hostname) as s:
4595 s.connect((HOST, server.port))
4596
Christian Heimese35d1ba2019-06-03 20:40:15 +02004597 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004598 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4599 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004600 msg
4601 )
4602 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004603 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4604 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004605 msg
4606 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004607
4608
Thomas Woutersed03b412007-08-28 21:37:11 +00004609def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004610 if support.verbose:
4611 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004612 'Mac': platform.mac_ver,
4613 'Windows': platform.win32_ver,
4614 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004615 for name, func in plats.items():
4616 plat = func()
4617 if plat and plat[0]:
4618 plat = '%s %r' % (name, plat)
4619 break
4620 else:
4621 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004622 print("test_ssl: testing with %r %r" %
4623 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4624 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004625 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004626 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4627 try:
4628 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4629 except AttributeError:
4630 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004631
Antoine Pitrou152efa22010-05-16 18:19:27 +00004632 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004633 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004634 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004635 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004636 BADCERT, BADKEY, EMPTYCERT]:
4637 if not os.path.exists(filename):
4638 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004639
Martin Panter3840b2a2016-03-27 01:53:46 +00004640 tests = [
4641 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004642 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004643 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004644 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004645
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004646 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004647 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004648
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004649 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004650 try:
4651 support.run_unittest(*tests)
4652 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004653 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004654
4655if __name__ == "__main__":
4656 test_main()