blob: b6794ce3a81748e811af970bbc183ee944d323c6 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070064 'notAfter': 'Aug 26 14:23:15 2028 GMT',
65 'notBefore': 'Aug 29 14:23:15 2018 GMT',
66 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010067 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070089 'notAfter': 'Jul 7 14:23:16 2028 GMT',
90 'notBefore': 'Aug 29 14:23:16 2018 GMT',
91 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010092 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Miss Islington (bot)be5de952019-01-15 15:03:36 -0800119TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Miss Islington (bot)e3228a32018-08-14 10:52:27 -0400121DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
185def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
187 @functools.wraps(func)
188 def f(*args, **kwargs):
189 try:
190 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
191 except ssl.SSLError:
192 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
193 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
194 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
195 return func(*args, **kwargs)
196 return f
197 else:
198 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100200needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
201
Antoine Pitrou23df4832010-08-04 17:14:06 +0000202
Christian Heimesd0486372016-09-10 23:23:33 +0200203def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
204 cert_reqs=ssl.CERT_NONE, ca_certs=None,
205 ciphers=None, certfile=None, keyfile=None,
206 **kwargs):
207 context = ssl.SSLContext(ssl_version)
208 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200209 if cert_reqs == ssl.CERT_NONE:
210 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200211 context.verify_mode = cert_reqs
212 if ca_certs is not None:
213 context.load_verify_locations(ca_certs)
214 if certfile is not None or keyfile is not None:
215 context.load_cert_chain(certfile, keyfile)
216 if ciphers is not None:
217 context.set_ciphers(ciphers)
218 return context.wrap_socket(sock, **kwargs)
219
Christian Heimesa170fa12017-09-15 20:27:30 +0200220
221def testing_context(server_cert=SIGNED_CERTFILE):
222 """Create context
223
224 client_context, server_context, hostname = testing_context()
225 """
226 if server_cert == SIGNED_CERTFILE:
227 hostname = SIGNED_CERTFILE_HOSTNAME
228 elif server_cert == SIGNED_CERTFILE2:
229 hostname = SIGNED_CERTFILE2_HOSTNAME
230 else:
231 raise ValueError(server_cert)
232
233 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
234 client_context.load_verify_locations(SIGNING_CA)
235
236 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
237 server_context.load_cert_chain(server_cert)
Christian Heimes2756ef32018-09-23 09:22:52 +0200238 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200239
240 return client_context, server_context, hostname
241
242
Antoine Pitrou152efa22010-05-16 18:19:27 +0000243class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000244
Antoine Pitrou480a1242010-04-28 21:37:09 +0000245 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000246 ssl.CERT_NONE
247 ssl.CERT_OPTIONAL
248 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100249 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100250 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100251 if ssl.HAS_ECDH:
252 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100253 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
254 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000255 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100256 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700257 ssl.OP_NO_SSLv2
258 ssl.OP_NO_SSLv3
259 ssl.OP_NO_TLSv1
260 ssl.OP_NO_TLSv1_3
261 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
262 ssl.OP_NO_TLSv1_1
263 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200264 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000265
Christian Heimes89c20512018-02-27 11:17:32 +0100266 def test_private_init(self):
267 with self.assertRaisesRegex(TypeError, "public constructor"):
268 with socket.socket() as s:
269 ssl.SSLSocket(s)
270
Antoine Pitrou172f0252014-04-18 20:33:08 +0200271 def test_str_for_enums(self):
272 # Make sure that the PROTOCOL_* constants have enum-like string
273 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200274 proto = ssl.PROTOCOL_TLS
275 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200276 ctx = ssl.SSLContext(proto)
277 self.assertIs(ctx.protocol, proto)
278
Antoine Pitrou480a1242010-04-28 21:37:09 +0000279 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000282 sys.stdout.write("\n RAND_status is %d (%s)\n"
283 % (v, (v and "sufficient randomness") or
284 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200285
286 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
287 self.assertEqual(len(data), 16)
288 self.assertEqual(is_cryptographic, v == 1)
289 if v:
290 data = ssl.RAND_bytes(16)
291 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200292 else:
293 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200294
Victor Stinner1e81a392013-12-19 16:47:04 +0100295 # negative num is invalid
296 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
297 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
298
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100299 if hasattr(ssl, 'RAND_egd'):
300 self.assertRaises(TypeError, ssl.RAND_egd, 1)
301 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000302 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200303 ssl.RAND_add(b"this is a random bytes object", 75.0)
304 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000305
Christian Heimesf77b4b22013-08-21 13:26:05 +0200306 @unittest.skipUnless(os.name == 'posix', 'requires posix')
307 def test_random_fork(self):
308 status = ssl.RAND_status()
309 if not status:
310 self.fail("OpenSSL's PRNG has insufficient randomness")
311
312 rfd, wfd = os.pipe()
313 pid = os.fork()
314 if pid == 0:
315 try:
316 os.close(rfd)
317 child_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(child_random), 16)
319 os.write(wfd, child_random)
320 os.close(wfd)
321 except BaseException:
322 os._exit(1)
323 else:
324 os._exit(0)
325 else:
326 os.close(wfd)
327 self.addCleanup(os.close, rfd)
328 _, status = os.waitpid(pid, 0)
329 self.assertEqual(status, 0)
330
331 child_random = os.read(rfd, 16)
332 self.assertEqual(len(child_random), 16)
333 parent_random = ssl.RAND_pseudo_bytes(16)[0]
334 self.assertEqual(len(parent_random), 16)
335
336 self.assertNotEqual(child_random, parent_random)
337
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -0700338 maxDiff = None
339
Antoine Pitrou480a1242010-04-28 21:37:09 +0000340 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000341 # note that this uses an 'unofficial' function in _ssl.c,
342 # provided solely for this test, to exercise the certificate
343 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100344 self.assertEqual(
345 ssl._ssl._test_decode_cert(CERTFILE),
346 CERTFILE_INFO
347 )
348 self.assertEqual(
349 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
350 SIGNED_CERTFILE_INFO
351 )
352
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200353 # Issue #13034: the subjectAltName in some certificates
354 # (notably projects.developer.nokia.com:443) wasn't parsed
355 p = ssl._ssl._test_decode_cert(NOKIACERT)
356 if support.verbose:
357 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
358 self.assertEqual(p['subjectAltName'],
359 (('DNS', 'projects.developer.nokia.com'),
360 ('DNS', 'projects.forum.nokia.com'))
361 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100362 # extra OCSP and AIA fields
363 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
364 self.assertEqual(p['caIssuers'],
365 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
366 self.assertEqual(p['crlDistributionPoints'],
367 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000368
Miss Islington (bot)be5de952019-01-15 15:03:36 -0800369 def test_parse_cert_CVE_2019_5010(self):
370 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
371 if support.verbose:
372 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
373 self.assertEqual(
374 p,
375 {
376 'issuer': (
377 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
378 'notAfter': 'Jun 14 18:00:58 2028 GMT',
379 'notBefore': 'Jun 18 18:00:58 2018 GMT',
380 'serialNumber': '02',
381 'subject': ((('countryName', 'UK'),),
382 (('commonName',
383 'codenomicon-vm-2.test.lal.cisco.com'),)),
384 'subjectAltName': (
385 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
386 'version': 3
387 }
388 )
389
Christian Heimes824f7f32013-08-17 00:54:47 +0200390 def test_parse_cert_CVE_2013_4238(self):
391 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
392 if support.verbose:
393 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
394 subject = ((('countryName', 'US'),),
395 (('stateOrProvinceName', 'Oregon'),),
396 (('localityName', 'Beaverton'),),
397 (('organizationName', 'Python Software Foundation'),),
398 (('organizationalUnitName', 'Python Core Development'),),
399 (('commonName', 'null.python.org\x00example.org'),),
400 (('emailAddress', 'python-dev@python.org'),))
401 self.assertEqual(p['subject'], subject)
402 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200403 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
404 san = (('DNS', 'altnull.python.org\x00example.com'),
405 ('email', 'null@python.org\x00user@example.org'),
406 ('URI', 'http://null.python.org\x00http://example.org'),
407 ('IP Address', '192.0.2.1'),
408 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
409 else:
410 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
411 san = (('DNS', 'altnull.python.org\x00example.com'),
412 ('email', 'null@python.org\x00user@example.org'),
413 ('URI', 'http://null.python.org\x00http://example.org'),
414 ('IP Address', '192.0.2.1'),
415 ('IP Address', '<invalid>'))
416
417 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200418
Christian Heimes1c03abd2016-09-06 23:25:35 +0200419 def test_parse_all_sans(self):
420 p = ssl._ssl._test_decode_cert(ALLSANFILE)
421 self.assertEqual(p['subjectAltName'],
422 (
423 ('DNS', 'allsans'),
424 ('othername', '<unsupported>'),
425 ('othername', '<unsupported>'),
426 ('email', 'user@example.org'),
427 ('DNS', 'www.example.org'),
428 ('DirName',
429 ((('countryName', 'XY'),),
430 (('localityName', 'Castle Anthrax'),),
431 (('organizationName', 'Python Software Foundation'),),
432 (('commonName', 'dirname example'),))),
433 ('URI', 'https://www.python.org/'),
434 ('IP Address', '127.0.0.1'),
435 ('IP Address', '0:0:0:0:0:0:0:1\n'),
436 ('Registered ID', '1.2.3.4.5')
437 )
438 )
439
Antoine Pitrou480a1242010-04-28 21:37:09 +0000440 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000441 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000442 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000443 d1 = ssl.PEM_cert_to_DER_cert(pem)
444 p2 = ssl.DER_cert_to_PEM_cert(d1)
445 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000446 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000447 if not p2.startswith(ssl.PEM_HEADER + '\n'):
448 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
449 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
450 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000451
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000452 def test_openssl_version(self):
453 n = ssl.OPENSSL_VERSION_NUMBER
454 t = ssl.OPENSSL_VERSION_INFO
455 s = ssl.OPENSSL_VERSION
456 self.assertIsInstance(n, int)
457 self.assertIsInstance(t, tuple)
458 self.assertIsInstance(s, str)
459 # Some sanity checks follow
460 # >= 0.9
461 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400462 # < 3.0
463 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000464 major, minor, fix, patch, status = t
465 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400466 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000467 self.assertGreaterEqual(minor, 0)
468 self.assertLess(minor, 256)
469 self.assertGreaterEqual(fix, 0)
470 self.assertLess(fix, 256)
471 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100472 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000473 self.assertGreaterEqual(status, 0)
474 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400475 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200476 if IS_LIBRESSL:
477 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100478 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400479 else:
480 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100481 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000482
Antoine Pitrou9d543662010-04-23 23:10:32 +0000483 @support.cpython_only
484 def test_refcycle(self):
485 # Issue #7943: an SSL object doesn't create reference cycles with
486 # itself.
487 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200488 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000489 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100490 with support.check_warnings(("", ResourceWarning)):
491 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100492 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000493
Antoine Pitroua468adc2010-09-14 14:43:44 +0000494 def test_wrapped_unconnected(self):
495 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200496 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000497 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200498 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100499 self.assertRaises(OSError, ss.recv, 1)
500 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
501 self.assertRaises(OSError, ss.recvfrom, 1)
502 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
503 self.assertRaises(OSError, ss.send, b'x')
504 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)6485aa62018-12-06 12:52:43 -0800505 self.assertRaises(NotImplementedError, ss.dup)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800506 self.assertRaises(NotImplementedError, ss.sendmsg,
507 [b'x'], (), 0, ('0.0.0.0', 0))
Miss Islington (bot)6485aa62018-12-06 12:52:43 -0800508 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
509 self.assertRaises(NotImplementedError, ss.recvmsg_into,
510 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000511
Antoine Pitrou40f08742010-04-24 22:04:40 +0000512 def test_timeout(self):
513 # Issue #8524: when creating an SSL socket, the timeout of the
514 # original socket should be retained.
515 for timeout in (None, 0.0, 5.0):
516 s = socket.socket(socket.AF_INET)
517 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200518 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100519 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000520
Christian Heimesd0486372016-09-10 23:23:33 +0200521 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000522 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000523 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000524 "certfile must be specified",
525 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000526 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000527 "certfile must be specified for server-side operations",
528 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000529 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000530 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200531 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100532 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
533 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200534 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200535 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000536 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000537 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000538 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200539 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000540 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000541 ssl.wrap_socket(sock,
542 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000543 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200544 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000545 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000546 ssl.wrap_socket(sock,
547 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000548 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000549
Martin Panter3464ea22016-02-01 21:58:11 +0000550 def bad_cert_test(self, certfile):
551 """Check that trying to use the given client certificate fails"""
552 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
553 certfile)
554 sock = socket.socket()
555 self.addCleanup(sock.close)
556 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200557 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200558 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000559
560 def test_empty_cert(self):
561 """Wrapping with an empty cert file"""
562 self.bad_cert_test("nullcert.pem")
563
564 def test_malformed_cert(self):
565 """Wrapping with a badly formatted certificate (syntax error)"""
566 self.bad_cert_test("badcert.pem")
567
568 def test_malformed_key(self):
569 """Wrapping with a badly formatted key (syntax error)"""
570 self.bad_cert_test("badkey.pem")
571
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000572 def test_match_hostname(self):
573 def ok(cert, hostname):
574 ssl.match_hostname(cert, hostname)
575 def fail(cert, hostname):
576 self.assertRaises(ssl.CertificateError,
577 ssl.match_hostname, cert, hostname)
578
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100579 # -- Hostname matching --
580
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000581 cert = {'subject': ((('commonName', 'example.com'),),)}
582 ok(cert, 'example.com')
583 ok(cert, 'ExAmple.cOm')
584 fail(cert, 'www.example.com')
585 fail(cert, '.example.com')
586 fail(cert, 'example.org')
587 fail(cert, 'exampleXcom')
588
589 cert = {'subject': ((('commonName', '*.a.com'),),)}
590 ok(cert, 'foo.a.com')
591 fail(cert, 'bar.foo.a.com')
592 fail(cert, 'a.com')
593 fail(cert, 'Xa.com')
594 fail(cert, '.a.com')
595
Mandeep Singhede2ac92017-11-27 04:01:27 +0530596 # only match wildcards when they are the only thing
597 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000598 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530599 fail(cert, 'foo.com')
600 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000601 fail(cert, 'bar.com')
602 fail(cert, 'foo.a.com')
603 fail(cert, 'bar.foo.com')
604
Christian Heimes824f7f32013-08-17 00:54:47 +0200605 # NULL bytes are bad, CVE-2013-4073
606 cert = {'subject': ((('commonName',
607 'null.python.org\x00example.org'),),)}
608 ok(cert, 'null.python.org\x00example.org') # or raise an error?
609 fail(cert, 'example.org')
610 fail(cert, 'null.python.org')
611
Georg Brandl72c98d32013-10-27 07:16:53 +0100612 # error cases with wildcards
613 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
614 fail(cert, 'bar.foo.a.com')
615 fail(cert, 'a.com')
616 fail(cert, 'Xa.com')
617 fail(cert, '.a.com')
618
619 cert = {'subject': ((('commonName', 'a.*.com'),),)}
620 fail(cert, 'a.foo.com')
621 fail(cert, 'a..com')
622 fail(cert, 'a.com')
623
624 # wildcard doesn't match IDNA prefix 'xn--'
625 idna = 'püthon.python.org'.encode("idna").decode("ascii")
626 cert = {'subject': ((('commonName', idna),),)}
627 ok(cert, idna)
628 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
629 fail(cert, idna)
630 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
631 fail(cert, idna)
632
633 # wildcard in first fragment and IDNA A-labels in sequent fragments
634 # are supported.
635 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
636 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530637 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
638 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100639 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
640 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
641
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000642 # Slightly fake real-world example
643 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
644 'subject': ((('commonName', 'linuxfrz.org'),),),
645 'subjectAltName': (('DNS', 'linuxfr.org'),
646 ('DNS', 'linuxfr.com'),
647 ('othername', '<unsupported>'))}
648 ok(cert, 'linuxfr.org')
649 ok(cert, 'linuxfr.com')
650 # Not a "DNS" entry
651 fail(cert, '<unsupported>')
652 # When there is a subjectAltName, commonName isn't used
653 fail(cert, 'linuxfrz.org')
654
655 # A pristine real-world example
656 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
657 'subject': ((('countryName', 'US'),),
658 (('stateOrProvinceName', 'California'),),
659 (('localityName', 'Mountain View'),),
660 (('organizationName', 'Google Inc'),),
661 (('commonName', 'mail.google.com'),))}
662 ok(cert, 'mail.google.com')
663 fail(cert, 'gmail.com')
664 # Only commonName is considered
665 fail(cert, 'California')
666
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100667 # -- IPv4 matching --
668 cert = {'subject': ((('commonName', 'example.com'),),),
669 'subjectAltName': (('DNS', 'example.com'),
670 ('IP Address', '10.11.12.13'),
671 ('IP Address', '14.15.16.17'))}
672 ok(cert, '10.11.12.13')
673 ok(cert, '14.15.16.17')
674 fail(cert, '14.15.16.18')
675 fail(cert, 'example.net')
676
677 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800678 if hasattr(socket, 'AF_INET6'):
679 cert = {'subject': ((('commonName', 'example.com'),),),
680 'subjectAltName': (
681 ('DNS', 'example.com'),
682 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
683 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
684 ok(cert, '2001::cafe')
685 ok(cert, '2003::baba')
686 fail(cert, '2003::bebe')
687 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100688
689 # -- Miscellaneous --
690
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000691 # Neither commonName nor subjectAltName
692 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
693 'subject': ((('countryName', 'US'),),
694 (('stateOrProvinceName', 'California'),),
695 (('localityName', 'Mountain View'),),
696 (('organizationName', 'Google Inc'),))}
697 fail(cert, 'mail.google.com')
698
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200699 # No DNS entry in subjectAltName but a commonName
700 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
701 'subject': ((('countryName', 'US'),),
702 (('stateOrProvinceName', 'California'),),
703 (('localityName', 'Mountain View'),),
704 (('commonName', 'mail.google.com'),)),
705 'subjectAltName': (('othername', 'blabla'), )}
706 ok(cert, 'mail.google.com')
707
708 # No DNS entry subjectAltName and no commonName
709 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
710 'subject': ((('countryName', 'US'),),
711 (('stateOrProvinceName', 'California'),),
712 (('localityName', 'Mountain View'),),
713 (('organizationName', 'Google Inc'),)),
714 'subjectAltName': (('othername', 'blabla'),)}
715 fail(cert, 'google.com')
716
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000717 # Empty cert / no cert
718 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
719 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
720
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200721 # Issue #17980: avoid denials of service by refusing more than one
722 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800723 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
724 with self.assertRaisesRegex(
725 ssl.CertificateError,
726 "partial wildcards in leftmost label are not supported"):
727 ssl.match_hostname(cert, 'axxb.example.com')
728
729 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
730 with self.assertRaisesRegex(
731 ssl.CertificateError,
732 "wildcard can only be present in the leftmost label"):
733 ssl.match_hostname(cert, 'www.sub.example.com')
734
735 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
736 with self.assertRaisesRegex(
737 ssl.CertificateError,
738 "too many wildcards"):
739 ssl.match_hostname(cert, 'axxbxxc.example.com')
740
741 cert = {'subject': ((('commonName', '*'),),)}
742 with self.assertRaisesRegex(
743 ssl.CertificateError,
744 "sole wildcard without additional labels are not support"):
745 ssl.match_hostname(cert, 'host')
746
747 cert = {'subject': ((('commonName', '*.com'),),)}
748 with self.assertRaisesRegex(
749 ssl.CertificateError,
750 r"hostname 'com' doesn't match '\*.com'"):
751 ssl.match_hostname(cert, 'com')
752
753 # extra checks for _inet_paton()
754 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
755 with self.assertRaises(ValueError):
756 ssl._inet_paton(invalid)
757 for ipaddr in ['127.0.0.1', '192.168.0.1']:
758 self.assertTrue(ssl._inet_paton(ipaddr))
759 if hasattr(socket, 'AF_INET6'):
760 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
761 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200762
Antoine Pitroud5323212010-10-22 18:19:07 +0000763 def test_server_side(self):
764 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200765 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000766 with socket.socket() as sock:
767 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
768 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000769
Antoine Pitroud6494802011-07-21 01:11:30 +0200770 def test_unknown_channel_binding(self):
771 # should raise ValueError for unknown type
772 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200773 s.bind(('127.0.0.1', 0))
774 s.listen()
775 c = socket.socket(socket.AF_INET)
776 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200777 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100778 with self.assertRaises(ValueError):
779 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200780 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200781
782 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
783 "'tls-unique' channel binding not available")
784 def test_tls_unique_channel_binding(self):
785 # unconnected should return None for known type
786 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200787 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100788 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200789 # the same for server-side
790 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200791 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100792 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200793
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600794 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200795 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600796 r = repr(ss)
797 with self.assertWarns(ResourceWarning) as cm:
798 ss = None
799 support.gc_collect()
800 self.assertIn(r, str(cm.warning.args[0]))
801
Christian Heimes6d7ad132013-06-09 18:02:55 +0200802 def test_get_default_verify_paths(self):
803 paths = ssl.get_default_verify_paths()
804 self.assertEqual(len(paths), 6)
805 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
806
807 with support.EnvironmentVarGuard() as env:
808 env["SSL_CERT_DIR"] = CAPATH
809 env["SSL_CERT_FILE"] = CERTFILE
810 paths = ssl.get_default_verify_paths()
811 self.assertEqual(paths.cafile, CERTFILE)
812 self.assertEqual(paths.capath, CAPATH)
813
Christian Heimes44109d72013-11-22 01:51:30 +0100814 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
815 def test_enum_certificates(self):
816 self.assertTrue(ssl.enum_certificates("CA"))
817 self.assertTrue(ssl.enum_certificates("ROOT"))
818
819 self.assertRaises(TypeError, ssl.enum_certificates)
820 self.assertRaises(WindowsError, ssl.enum_certificates, "")
821
Christian Heimesc2d65e12013-11-22 16:13:55 +0100822 trust_oids = set()
823 for storename in ("CA", "ROOT"):
824 store = ssl.enum_certificates(storename)
825 self.assertIsInstance(store, list)
826 for element in store:
827 self.assertIsInstance(element, tuple)
828 self.assertEqual(len(element), 3)
829 cert, enc, trust = element
830 self.assertIsInstance(cert, bytes)
831 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
832 self.assertIsInstance(trust, (set, bool))
833 if isinstance(trust, set):
834 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100835
836 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100837 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200838
Christian Heimes46bebee2013-06-09 19:03:31 +0200839 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100840 def test_enum_crls(self):
841 self.assertTrue(ssl.enum_crls("CA"))
842 self.assertRaises(TypeError, ssl.enum_crls)
843 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200844
Christian Heimes44109d72013-11-22 01:51:30 +0100845 crls = ssl.enum_crls("CA")
846 self.assertIsInstance(crls, list)
847 for element in crls:
848 self.assertIsInstance(element, tuple)
849 self.assertEqual(len(element), 2)
850 self.assertIsInstance(element[0], bytes)
851 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200852
Christian Heimes46bebee2013-06-09 19:03:31 +0200853
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100854 def test_asn1object(self):
855 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
856 '1.3.6.1.5.5.7.3.1')
857
858 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
859 self.assertEqual(val, expected)
860 self.assertEqual(val.nid, 129)
861 self.assertEqual(val.shortname, 'serverAuth')
862 self.assertEqual(val.longname, 'TLS Web Server Authentication')
863 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
864 self.assertIsInstance(val, ssl._ASN1Object)
865 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
866
867 val = ssl._ASN1Object.fromnid(129)
868 self.assertEqual(val, expected)
869 self.assertIsInstance(val, ssl._ASN1Object)
870 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100871 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
872 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100873 for i in range(1000):
874 try:
875 obj = ssl._ASN1Object.fromnid(i)
876 except ValueError:
877 pass
878 else:
879 self.assertIsInstance(obj.nid, int)
880 self.assertIsInstance(obj.shortname, str)
881 self.assertIsInstance(obj.longname, str)
882 self.assertIsInstance(obj.oid, (str, type(None)))
883
884 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
885 self.assertEqual(val, expected)
886 self.assertIsInstance(val, ssl._ASN1Object)
887 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
888 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
889 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100890 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
891 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100892
Christian Heimes72d28502013-11-23 13:56:58 +0100893 def test_purpose_enum(self):
894 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
895 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
896 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
897 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
898 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
899 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
900 '1.3.6.1.5.5.7.3.1')
901
902 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
903 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
904 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
905 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
906 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
907 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
908 '1.3.6.1.5.5.7.3.2')
909
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100910 def test_unsupported_dtls(self):
911 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
912 self.addCleanup(s.close)
913 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200914 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100915 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200916 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100917 with self.assertRaises(NotImplementedError) as cx:
918 ctx.wrap_socket(s)
919 self.assertEqual(str(cx.exception), "only stream sockets are supported")
920
Antoine Pitrouc695c952014-04-28 20:57:36 +0200921 def cert_time_ok(self, timestring, timestamp):
922 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
923
924 def cert_time_fail(self, timestring):
925 with self.assertRaises(ValueError):
926 ssl.cert_time_to_seconds(timestring)
927
928 @unittest.skipUnless(utc_offset(),
929 'local time needs to be different from UTC')
930 def test_cert_time_to_seconds_timezone(self):
931 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
932 # results if local timezone is not UTC
933 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
934 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
935
936 def test_cert_time_to_seconds(self):
937 timestring = "Jan 5 09:34:43 2018 GMT"
938 ts = 1515144883.0
939 self.cert_time_ok(timestring, ts)
940 # accept keyword parameter, assert its name
941 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
942 # accept both %e and %d (space or zero generated by strftime)
943 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
944 # case-insensitive
945 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
946 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
947 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
948 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
949 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
950 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
951 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
952 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
953
954 newyear_ts = 1230768000.0
955 # leap seconds
956 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
957 # same timestamp
958 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
959
960 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
961 # allow 60th second (even if it is not a leap second)
962 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
963 # allow 2nd leap second for compatibility with time.strptime()
964 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
965 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
966
Mike53f7a7c2017-12-14 14:04:53 +0300967 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200968 # 99991231235959Z (rfc 5280)
969 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
970
971 @support.run_with_locale('LC_ALL', '')
972 def test_cert_time_to_seconds_locale(self):
973 # `cert_time_to_seconds()` should be locale independent
974
975 def local_february_name():
976 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
977
978 if local_february_name().lower() == 'feb':
979 self.skipTest("locale-specific month name needs to be "
980 "different from C locale")
981
982 # locale-independent
983 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
984 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
985
Martin Panter3840b2a2016-03-27 01:53:46 +0000986 def test_connect_ex_error(self):
987 server = socket.socket(socket.AF_INET)
988 self.addCleanup(server.close)
989 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200990 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000991 cert_reqs=ssl.CERT_REQUIRED)
992 self.addCleanup(s.close)
993 rc = s.connect_ex((HOST, port))
994 # Issue #19919: Windows machines or VMs hosted on Windows
995 # machines sometimes return EWOULDBLOCK.
996 errors = (
997 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
998 errno.EWOULDBLOCK,
999 )
1000 self.assertIn(rc, errors)
1001
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001002
Antoine Pitrou152efa22010-05-16 18:19:27 +00001003class ContextTests(unittest.TestCase):
1004
Antoine Pitrou23df4832010-08-04 17:14:06 +00001005 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +00001006 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001007 for protocol in PROTOCOLS:
1008 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001009 ctx = ssl.SSLContext()
1010 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001011 self.assertRaises(ValueError, ssl.SSLContext, -1)
1012 self.assertRaises(ValueError, ssl.SSLContext, 42)
1013
Antoine Pitrou23df4832010-08-04 17:14:06 +00001014 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +00001015 def test_protocol(self):
1016 for proto in PROTOCOLS:
1017 ctx = ssl.SSLContext(proto)
1018 self.assertEqual(ctx.protocol, proto)
1019
1020 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001021 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001022 ctx.set_ciphers("ALL")
1023 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001024 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001025 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001026
Christian Heimes892d66e2018-01-29 14:10:18 +01001027 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1028 "Test applies only to Python default ciphers")
1029 def test_python_ciphers(self):
1030 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1031 ciphers = ctx.get_ciphers()
1032 for suite in ciphers:
1033 name = suite['name']
1034 self.assertNotIn("PSK", name)
1035 self.assertNotIn("SRP", name)
1036 self.assertNotIn("MD5", name)
1037 self.assertNotIn("RC4", name)
1038 self.assertNotIn("3DES", name)
1039
Christian Heimes25bfcd52016-09-06 00:04:45 +02001040 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1041 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001042 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001043 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001044 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001045 self.assertIn('AES256-GCM-SHA384', names)
1046 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001047
Antoine Pitrou23df4832010-08-04 17:14:06 +00001048 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001049 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001051 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001052 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001053 # SSLContext also enables these by default
1054 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001055 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1056 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001057 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001058 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001059 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001060 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001061 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1062 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001063 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001064 # Ubuntu has OP_NO_SSLv3 forced on by default
1065 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001066 else:
1067 with self.assertRaises(ValueError):
1068 ctx.options = 0
1069
Christian Heimesa170fa12017-09-15 20:27:30 +02001070 def test_verify_mode_protocol(self):
1071 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001072 # Default value
1073 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1074 ctx.verify_mode = ssl.CERT_OPTIONAL
1075 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1076 ctx.verify_mode = ssl.CERT_REQUIRED
1077 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1078 ctx.verify_mode = ssl.CERT_NONE
1079 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1080 with self.assertRaises(TypeError):
1081 ctx.verify_mode = None
1082 with self.assertRaises(ValueError):
1083 ctx.verify_mode = 42
1084
Christian Heimesa170fa12017-09-15 20:27:30 +02001085 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1086 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1087 self.assertFalse(ctx.check_hostname)
1088
1089 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1090 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1091 self.assertTrue(ctx.check_hostname)
1092
Christian Heimes61d478c2018-01-27 15:51:38 +01001093 def test_hostname_checks_common_name(self):
1094 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1095 self.assertTrue(ctx.hostname_checks_common_name)
1096 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1097 ctx.hostname_checks_common_name = True
1098 self.assertTrue(ctx.hostname_checks_common_name)
1099 ctx.hostname_checks_common_name = False
1100 self.assertFalse(ctx.hostname_checks_common_name)
1101 ctx.hostname_checks_common_name = True
1102 self.assertTrue(ctx.hostname_checks_common_name)
1103 else:
1104 with self.assertRaises(AttributeError):
1105 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001106
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001107 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1108 "required OpenSSL 1.1.0g")
1109 def test_min_max_version(self):
1110 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1111 self.assertEqual(
1112 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1113 )
1114 self.assertEqual(
1115 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1116 )
1117
1118 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1119 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1120 self.assertEqual(
1121 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1122 )
1123 self.assertEqual(
1124 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1125 )
1126
1127 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1128 ctx.maximum_version = ssl.TLSVersion.TLSv1
1129 self.assertEqual(
1130 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1131 )
1132 self.assertEqual(
1133 ctx.maximum_version, ssl.TLSVersion.TLSv1
1134 )
1135
1136 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 self.assertEqual(
1138 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1139 )
1140
1141 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1142 self.assertIn(
1143 ctx.maximum_version,
1144 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1145 )
1146
1147 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1148 self.assertIn(
1149 ctx.minimum_version,
1150 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1151 )
1152
1153 with self.assertRaises(ValueError):
1154 ctx.minimum_version = 42
1155
1156 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1157
1158 self.assertEqual(
1159 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1160 )
1161 self.assertEqual(
1162 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1163 )
1164 with self.assertRaises(ValueError):
1165 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1166 with self.assertRaises(ValueError):
1167 ctx.maximum_version = ssl.TLSVersion.TLSv1
1168
1169
Christian Heimes2427b502013-11-23 11:24:32 +01001170 @unittest.skipUnless(have_verify_flags(),
1171 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001172 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001174 # default value
1175 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1176 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001177 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1178 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1179 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1180 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1181 ctx.verify_flags = ssl.VERIFY_DEFAULT
1182 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1183 # supports any value
1184 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1185 self.assertEqual(ctx.verify_flags,
1186 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1187 with self.assertRaises(TypeError):
1188 ctx.verify_flags = None
1189
Antoine Pitrou152efa22010-05-16 18:19:27 +00001190 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001192 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001193 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001194 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1195 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001196 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001197 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001198 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001199 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001200 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001201 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001202 ctx.load_cert_chain(EMPTYCERT)
1203 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001204 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001205 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1206 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1207 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001208 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001209 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001210 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001211 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001212 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001213 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1214 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001215 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001216 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001217 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001218 # Password protected key and cert
1219 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1220 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1221 ctx.load_cert_chain(CERTFILE_PROTECTED,
1222 password=bytearray(KEY_PASSWORD.encode()))
1223 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1224 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1225 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1226 bytearray(KEY_PASSWORD.encode()))
1227 with self.assertRaisesRegex(TypeError, "should be a string"):
1228 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1229 with self.assertRaises(ssl.SSLError):
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1231 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1232 # openssl has a fixed limit on the password buffer.
1233 # PEM_BUFSIZE is generally set to 1kb.
1234 # Return a string larger than this.
1235 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1236 # Password callback
1237 def getpass_unicode():
1238 return KEY_PASSWORD
1239 def getpass_bytes():
1240 return KEY_PASSWORD.encode()
1241 def getpass_bytearray():
1242 return bytearray(KEY_PASSWORD.encode())
1243 def getpass_badpass():
1244 return "badpass"
1245 def getpass_huge():
1246 return b'a' * (1024 * 1024)
1247 def getpass_bad_type():
1248 return 9
1249 def getpass_exception():
1250 raise Exception('getpass error')
1251 class GetPassCallable:
1252 def __call__(self):
1253 return KEY_PASSWORD
1254 def getpass(self):
1255 return KEY_PASSWORD
1256 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1257 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1258 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1259 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1260 ctx.load_cert_chain(CERTFILE_PROTECTED,
1261 password=GetPassCallable().getpass)
1262 with self.assertRaises(ssl.SSLError):
1263 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1264 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1265 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1266 with self.assertRaisesRegex(TypeError, "must return a string"):
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1268 with self.assertRaisesRegex(Exception, "getpass error"):
1269 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1270 # Make sure the password function isn't called if it isn't needed
1271 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001272
1273 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001275 ctx.load_verify_locations(CERTFILE)
1276 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1277 ctx.load_verify_locations(BYTES_CERTFILE)
1278 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1279 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001280 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001281 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001282 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001283 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001284 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001285 ctx.load_verify_locations(BADCERT)
1286 ctx.load_verify_locations(CERTFILE, CAPATH)
1287 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1288
Victor Stinner80f75e62011-01-29 11:31:20 +00001289 # Issue #10989: crash if the second argument type is invalid
1290 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1291
Christian Heimesefff7062013-11-21 03:35:02 +01001292 def test_load_verify_cadata(self):
1293 # test cadata
1294 with open(CAFILE_CACERT) as f:
1295 cacert_pem = f.read()
1296 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1297 with open(CAFILE_NEURONIO) as f:
1298 neuronio_pem = f.read()
1299 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1300
1301 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001302 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1304 ctx.load_verify_locations(cadata=cacert_pem)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1306 ctx.load_verify_locations(cadata=neuronio_pem)
1307 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1308 # cert already in hash table
1309 ctx.load_verify_locations(cadata=neuronio_pem)
1310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1311
1312 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001313 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001314 combined = "\n".join((cacert_pem, neuronio_pem))
1315 ctx.load_verify_locations(cadata=combined)
1316 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1317
1318 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001319 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001320 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1321 neuronio_pem, "tail"]
1322 ctx.load_verify_locations(cadata="\n".join(combined))
1323 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1324
1325 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001326 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001327 ctx.load_verify_locations(cadata=cacert_der)
1328 ctx.load_verify_locations(cadata=neuronio_der)
1329 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1330 # cert already in hash table
1331 ctx.load_verify_locations(cadata=cacert_der)
1332 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1333
1334 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001335 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001336 combined = b"".join((cacert_der, neuronio_der))
1337 ctx.load_verify_locations(cadata=combined)
1338 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1339
1340 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001341 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001342 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1343
1344 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1345 ctx.load_verify_locations(cadata="broken")
1346 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1347 ctx.load_verify_locations(cadata=b"broken")
1348
1349
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001350 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001351 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001352 ctx.load_dh_params(DHFILE)
1353 if os.name != 'nt':
1354 ctx.load_dh_params(BYTES_DHFILE)
1355 self.assertRaises(TypeError, ctx.load_dh_params)
1356 self.assertRaises(TypeError, ctx.load_dh_params, None)
1357 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001358 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001359 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001360 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001361 ctx.load_dh_params(CERTFILE)
1362
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001363 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001364 def test_session_stats(self):
1365 for proto in PROTOCOLS:
1366 ctx = ssl.SSLContext(proto)
1367 self.assertEqual(ctx.session_stats(), {
1368 'number': 0,
1369 'connect': 0,
1370 'connect_good': 0,
1371 'connect_renegotiate': 0,
1372 'accept': 0,
1373 'accept_good': 0,
1374 'accept_renegotiate': 0,
1375 'hits': 0,
1376 'misses': 0,
1377 'timeouts': 0,
1378 'cache_full': 0,
1379 })
1380
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001381 def test_set_default_verify_paths(self):
1382 # There's not much we can do to test that it acts as expected,
1383 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001384 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001385 ctx.set_default_verify_paths()
1386
Antoine Pitrou501da612011-12-21 09:27:41 +01001387 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001388 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001389 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001390 ctx.set_ecdh_curve("prime256v1")
1391 ctx.set_ecdh_curve(b"prime256v1")
1392 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1393 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1394 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1395 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1396
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001397 @needs_sni
1398 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001399 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001400
1401 # set_servername_callback expects a callable, or None
1402 self.assertRaises(TypeError, ctx.set_servername_callback)
1403 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1404 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1405 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1406
1407 def dummycallback(sock, servername, ctx):
1408 pass
1409 ctx.set_servername_callback(None)
1410 ctx.set_servername_callback(dummycallback)
1411
1412 @needs_sni
1413 def test_sni_callback_refcycle(self):
1414 # Reference cycles through the servername callback are detected
1415 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001416 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001417 def dummycallback(sock, servername, ctx, cycle=ctx):
1418 pass
1419 ctx.set_servername_callback(dummycallback)
1420 wr = weakref.ref(ctx)
1421 del ctx, dummycallback
1422 gc.collect()
1423 self.assertIs(wr(), None)
1424
Christian Heimes9a5395a2013-06-17 15:44:12 +02001425 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001426 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001427 self.assertEqual(ctx.cert_store_stats(),
1428 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1429 ctx.load_cert_chain(CERTFILE)
1430 self.assertEqual(ctx.cert_store_stats(),
1431 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1432 ctx.load_verify_locations(CERTFILE)
1433 self.assertEqual(ctx.cert_store_stats(),
1434 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001435 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001436 self.assertEqual(ctx.cert_store_stats(),
1437 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1438
1439 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001440 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001441 self.assertEqual(ctx.get_ca_certs(), [])
1442 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1443 ctx.load_verify_locations(CERTFILE)
1444 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001445 # but CAFILE_CACERT is a CA cert
1446 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001447 self.assertEqual(ctx.get_ca_certs(),
1448 [{'issuer': ((('organizationName', 'Root CA'),),
1449 (('organizationalUnitName', 'http://www.cacert.org'),),
1450 (('commonName', 'CA Cert Signing Authority'),),
1451 (('emailAddress', 'support@cacert.org'),)),
1452 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1453 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1454 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001455 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001456 'subject': ((('organizationName', 'Root CA'),),
1457 (('organizationalUnitName', 'http://www.cacert.org'),),
1458 (('commonName', 'CA Cert Signing Authority'),),
1459 (('emailAddress', 'support@cacert.org'),)),
1460 'version': 3}])
1461
Martin Panterb55f8b72016-01-14 12:53:56 +00001462 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001463 pem = f.read()
1464 der = ssl.PEM_cert_to_DER_cert(pem)
1465 self.assertEqual(ctx.get_ca_certs(True), [der])
1466
Christian Heimes72d28502013-11-23 13:56:58 +01001467 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001469 ctx.load_default_certs()
1470
Christian Heimesa170fa12017-09-15 20:27:30 +02001471 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001472 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1473 ctx.load_default_certs()
1474
Christian Heimesa170fa12017-09-15 20:27:30 +02001475 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001476 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1477
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001479 self.assertRaises(TypeError, ctx.load_default_certs, None)
1480 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1481
Benjamin Peterson91244e02014-10-03 18:17:15 -04001482 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001483 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001484 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001485 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001486 with support.EnvironmentVarGuard() as env:
1487 env["SSL_CERT_DIR"] = CAPATH
1488 env["SSL_CERT_FILE"] = CERTFILE
1489 ctx.load_default_certs()
1490 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1491
Benjamin Peterson91244e02014-10-03 18:17:15 -04001492 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001493 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001494 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001495 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001496 ctx.load_default_certs()
1497 stats = ctx.cert_store_stats()
1498
Christian Heimesa170fa12017-09-15 20:27:30 +02001499 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001500 with support.EnvironmentVarGuard() as env:
1501 env["SSL_CERT_DIR"] = CAPATH
1502 env["SSL_CERT_FILE"] = CERTFILE
1503 ctx.load_default_certs()
1504 stats["x509"] += 1
1505 self.assertEqual(ctx.cert_store_stats(), stats)
1506
Christian Heimes358cfd42016-09-10 22:43:48 +02001507 def _assert_context_options(self, ctx):
1508 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1509 if OP_NO_COMPRESSION != 0:
1510 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1511 OP_NO_COMPRESSION)
1512 if OP_SINGLE_DH_USE != 0:
1513 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1514 OP_SINGLE_DH_USE)
1515 if OP_SINGLE_ECDH_USE != 0:
1516 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1517 OP_SINGLE_ECDH_USE)
1518 if OP_CIPHER_SERVER_PREFERENCE != 0:
1519 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1520 OP_CIPHER_SERVER_PREFERENCE)
1521
Christian Heimes4c05b472013-11-23 15:58:30 +01001522 def test_create_default_context(self):
1523 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001524
Christian Heimesa170fa12017-09-15 20:27:30 +02001525 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001526 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001527 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001528 self._assert_context_options(ctx)
1529
Christian Heimes4c05b472013-11-23 15:58:30 +01001530 with open(SIGNING_CA) as f:
1531 cadata = f.read()
1532 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1533 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001534 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001535 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001536 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001537
1538 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001539 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001540 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001541 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001542
Christian Heimes67986f92013-11-23 22:43:47 +01001543 def test__create_stdlib_context(self):
1544 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001545 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001546 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001547 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001548 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001549
1550 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1551 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1552 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001553 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001554
1555 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001556 cert_reqs=ssl.CERT_REQUIRED,
1557 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001558 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1559 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001560 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001561 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001562
1563 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001564 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001565 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001566 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001567
Christian Heimes1aa9a752013-12-02 02:41:19 +01001568 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001569 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001570 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001571 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001572
Christian Heimese82c0342017-09-15 20:29:57 +02001573 # Auto set CERT_REQUIRED
1574 ctx.check_hostname = True
1575 self.assertTrue(ctx.check_hostname)
1576 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1577 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001578 ctx.verify_mode = ssl.CERT_REQUIRED
1579 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001580 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001581
Christian Heimese82c0342017-09-15 20:29:57 +02001582 # Changing verify_mode does not affect check_hostname
1583 ctx.check_hostname = False
1584 ctx.verify_mode = ssl.CERT_NONE
1585 ctx.check_hostname = False
1586 self.assertFalse(ctx.check_hostname)
1587 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1588 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001589 ctx.check_hostname = True
1590 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001591 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1592
1593 ctx.check_hostname = False
1594 ctx.verify_mode = ssl.CERT_OPTIONAL
1595 ctx.check_hostname = False
1596 self.assertFalse(ctx.check_hostname)
1597 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1598 # keep CERT_OPTIONAL
1599 ctx.check_hostname = True
1600 self.assertTrue(ctx.check_hostname)
1601 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001602
1603 # Cannot set CERT_NONE with check_hostname enabled
1604 with self.assertRaises(ValueError):
1605 ctx.verify_mode = ssl.CERT_NONE
1606 ctx.check_hostname = False
1607 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001608 ctx.verify_mode = ssl.CERT_NONE
1609 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001610
Christian Heimes5fe668c2016-09-12 00:01:11 +02001611 def test_context_client_server(self):
1612 # PROTOCOL_TLS_CLIENT has sane defaults
1613 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1614 self.assertTrue(ctx.check_hostname)
1615 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1616
1617 # PROTOCOL_TLS_SERVER has different but also sane defaults
1618 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1619 self.assertFalse(ctx.check_hostname)
1620 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1621
Christian Heimes4df60f12017-09-15 20:26:05 +02001622 def test_context_custom_class(self):
1623 class MySSLSocket(ssl.SSLSocket):
1624 pass
1625
1626 class MySSLObject(ssl.SSLObject):
1627 pass
1628
1629 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1630 ctx.sslsocket_class = MySSLSocket
1631 ctx.sslobject_class = MySSLObject
1632
1633 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1634 self.assertIsInstance(sock, MySSLSocket)
1635 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1636 self.assertIsInstance(obj, MySSLObject)
1637
Antoine Pitrou152efa22010-05-16 18:19:27 +00001638
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001639class SSLErrorTests(unittest.TestCase):
1640
1641 def test_str(self):
1642 # The str() of a SSLError doesn't include the errno
1643 e = ssl.SSLError(1, "foo")
1644 self.assertEqual(str(e), "foo")
1645 self.assertEqual(e.errno, 1)
1646 # Same for a subclass
1647 e = ssl.SSLZeroReturnError(1, "foo")
1648 self.assertEqual(str(e), "foo")
1649 self.assertEqual(e.errno, 1)
1650
1651 def test_lib_reason(self):
1652 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001653 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001654 with self.assertRaises(ssl.SSLError) as cm:
1655 ctx.load_dh_params(CERTFILE)
1656 self.assertEqual(cm.exception.library, 'PEM')
1657 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1658 s = str(cm.exception)
1659 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1660
1661 def test_subclass(self):
1662 # Check that the appropriate SSLError subclass is raised
1663 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001664 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1665 ctx.check_hostname = False
1666 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001667 with socket.socket() as s:
1668 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001669 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001670 c = socket.socket()
1671 c.connect(s.getsockname())
1672 c.setblocking(False)
1673 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001674 with self.assertRaises(ssl.SSLWantReadError) as cm:
1675 c.do_handshake()
1676 s = str(cm.exception)
1677 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1678 # For compatibility
1679 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1680
1681
Christian Heimes61d478c2018-01-27 15:51:38 +01001682 def test_bad_server_hostname(self):
1683 ctx = ssl.create_default_context()
1684 with self.assertRaises(ValueError):
1685 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1686 server_hostname="")
1687 with self.assertRaises(ValueError):
1688 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1689 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001690 with self.assertRaises(TypeError):
1691 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1692 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001693
1694
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001695class MemoryBIOTests(unittest.TestCase):
1696
1697 def test_read_write(self):
1698 bio = ssl.MemoryBIO()
1699 bio.write(b'foo')
1700 self.assertEqual(bio.read(), b'foo')
1701 self.assertEqual(bio.read(), b'')
1702 bio.write(b'foo')
1703 bio.write(b'bar')
1704 self.assertEqual(bio.read(), b'foobar')
1705 self.assertEqual(bio.read(), b'')
1706 bio.write(b'baz')
1707 self.assertEqual(bio.read(2), b'ba')
1708 self.assertEqual(bio.read(1), b'z')
1709 self.assertEqual(bio.read(1), b'')
1710
1711 def test_eof(self):
1712 bio = ssl.MemoryBIO()
1713 self.assertFalse(bio.eof)
1714 self.assertEqual(bio.read(), b'')
1715 self.assertFalse(bio.eof)
1716 bio.write(b'foo')
1717 self.assertFalse(bio.eof)
1718 bio.write_eof()
1719 self.assertFalse(bio.eof)
1720 self.assertEqual(bio.read(2), b'fo')
1721 self.assertFalse(bio.eof)
1722 self.assertEqual(bio.read(1), b'o')
1723 self.assertTrue(bio.eof)
1724 self.assertEqual(bio.read(), b'')
1725 self.assertTrue(bio.eof)
1726
1727 def test_pending(self):
1728 bio = ssl.MemoryBIO()
1729 self.assertEqual(bio.pending, 0)
1730 bio.write(b'foo')
1731 self.assertEqual(bio.pending, 3)
1732 for i in range(3):
1733 bio.read(1)
1734 self.assertEqual(bio.pending, 3-i-1)
1735 for i in range(3):
1736 bio.write(b'x')
1737 self.assertEqual(bio.pending, i+1)
1738 bio.read()
1739 self.assertEqual(bio.pending, 0)
1740
1741 def test_buffer_types(self):
1742 bio = ssl.MemoryBIO()
1743 bio.write(b'foo')
1744 self.assertEqual(bio.read(), b'foo')
1745 bio.write(bytearray(b'bar'))
1746 self.assertEqual(bio.read(), b'bar')
1747 bio.write(memoryview(b'baz'))
1748 self.assertEqual(bio.read(), b'baz')
1749
1750 def test_error_types(self):
1751 bio = ssl.MemoryBIO()
1752 self.assertRaises(TypeError, bio.write, 'foo')
1753 self.assertRaises(TypeError, bio.write, None)
1754 self.assertRaises(TypeError, bio.write, True)
1755 self.assertRaises(TypeError, bio.write, 1)
1756
1757
Christian Heimes89c20512018-02-27 11:17:32 +01001758class SSLObjectTests(unittest.TestCase):
1759 def test_private_init(self):
1760 bio = ssl.MemoryBIO()
1761 with self.assertRaisesRegex(TypeError, "public constructor"):
1762 ssl.SSLObject(bio, bio)
1763
Miss Islington (bot)c00f7032018-09-21 22:00:42 -07001764 def test_unwrap(self):
1765 client_ctx, server_ctx, hostname = testing_context()
1766 c_in = ssl.MemoryBIO()
1767 c_out = ssl.MemoryBIO()
1768 s_in = ssl.MemoryBIO()
1769 s_out = ssl.MemoryBIO()
1770 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1771 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1772
1773 # Loop on the handshake for a bit to get it settled
1774 for _ in range(5):
1775 try:
1776 client.do_handshake()
1777 except ssl.SSLWantReadError:
1778 pass
1779 if c_out.pending:
1780 s_in.write(c_out.read())
1781 try:
1782 server.do_handshake()
1783 except ssl.SSLWantReadError:
1784 pass
1785 if s_out.pending:
1786 c_in.write(s_out.read())
1787 # Now the handshakes should be complete (don't raise WantReadError)
1788 client.do_handshake()
1789 server.do_handshake()
1790
1791 # Now if we unwrap one side unilaterally, it should send close-notify
1792 # and raise WantReadError:
1793 with self.assertRaises(ssl.SSLWantReadError):
1794 client.unwrap()
1795
1796 # But server.unwrap() does not raise, because it reads the client's
1797 # close-notify:
1798 s_in.write(c_out.read())
1799 server.unwrap()
1800
1801 # And now that the client gets the server's close-notify, it doesn't
1802 # raise either.
1803 c_in.write(s_out.read())
1804 client.unwrap()
Christian Heimes89c20512018-02-27 11:17:32 +01001805
Martin Panter3840b2a2016-03-27 01:53:46 +00001806class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001807 """Tests that connect to a simple server running in the background"""
1808
1809 def setUp(self):
1810 server = ThreadedEchoServer(SIGNED_CERTFILE)
1811 self.server_addr = (HOST, server.port)
1812 server.__enter__()
1813 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001814
Antoine Pitrou480a1242010-04-28 21:37:09 +00001815 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001816 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001817 cert_reqs=ssl.CERT_NONE) as s:
1818 s.connect(self.server_addr)
1819 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001820 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001821
Martin Panter3840b2a2016-03-27 01:53:46 +00001822 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001823 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001824 cert_reqs=ssl.CERT_REQUIRED,
1825 ca_certs=SIGNING_CA) as s:
1826 s.connect(self.server_addr)
1827 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001828 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001829
Martin Panter3840b2a2016-03-27 01:53:46 +00001830 def test_connect_fail(self):
1831 # This should fail because we have no verification certs. Connection
1832 # failure crashes ThreadedEchoServer, so run this in an independent
1833 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001834 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001835 cert_reqs=ssl.CERT_REQUIRED)
1836 self.addCleanup(s.close)
1837 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1838 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001839
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001840 def test_connect_ex(self):
1841 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001842 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 cert_reqs=ssl.CERT_REQUIRED,
1844 ca_certs=SIGNING_CA)
1845 self.addCleanup(s.close)
1846 self.assertEqual(0, s.connect_ex(self.server_addr))
1847 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001848
1849 def test_non_blocking_connect_ex(self):
1850 # Issue #11326: non-blocking connect_ex() should allow handshake
1851 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001852 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001853 cert_reqs=ssl.CERT_REQUIRED,
1854 ca_certs=SIGNING_CA,
1855 do_handshake_on_connect=False)
1856 self.addCleanup(s.close)
1857 s.setblocking(False)
1858 rc = s.connect_ex(self.server_addr)
1859 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1860 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1861 # Wait for connect to finish
1862 select.select([], [s], [], 5.0)
1863 # Non-blocking handshake
1864 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001865 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001866 s.do_handshake()
1867 break
1868 except ssl.SSLWantReadError:
1869 select.select([s], [], [], 5.0)
1870 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001871 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 # SSL established
1873 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001874
Antoine Pitrou152efa22010-05-16 18:19:27 +00001875 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001876 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001877 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001878 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1879 s.connect(self.server_addr)
1880 self.assertEqual({}, s.getpeercert())
1881 # Same with a server hostname
1882 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1883 server_hostname="dummy") as s:
1884 s.connect(self.server_addr)
1885 ctx.verify_mode = ssl.CERT_REQUIRED
1886 # This should succeed because we specify the root cert
1887 ctx.load_verify_locations(SIGNING_CA)
1888 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1889 s.connect(self.server_addr)
1890 cert = s.getpeercert()
1891 self.assertTrue(cert)
1892
1893 def test_connect_with_context_fail(self):
1894 # This should fail because we have no verification certs. Connection
1895 # failure crashes ThreadedEchoServer, so run this in an independent
1896 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001897 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001898 ctx.verify_mode = ssl.CERT_REQUIRED
1899 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1900 self.addCleanup(s.close)
1901 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1902 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001903
1904 def test_connect_capath(self):
1905 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001906 # NOTE: the subject hashing algorithm has been changed between
1907 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1908 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001909 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001910 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001911 ctx.verify_mode = ssl.CERT_REQUIRED
1912 ctx.load_verify_locations(capath=CAPATH)
1913 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1914 s.connect(self.server_addr)
1915 cert = s.getpeercert()
1916 self.assertTrue(cert)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07001917
Martin Panter3840b2a2016-03-27 01:53:46 +00001918 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001919 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001920 ctx.verify_mode = ssl.CERT_REQUIRED
1921 ctx.load_verify_locations(capath=BYTES_CAPATH)
1922 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1923 s.connect(self.server_addr)
1924 cert = s.getpeercert()
1925 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001926
Christian Heimesefff7062013-11-21 03:35:02 +01001927 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001928 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001929 pem = f.read()
1930 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001931 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 ctx.verify_mode = ssl.CERT_REQUIRED
1933 ctx.load_verify_locations(cadata=pem)
1934 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1935 s.connect(self.server_addr)
1936 cert = s.getpeercert()
1937 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001938
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001940 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001941 ctx.verify_mode = ssl.CERT_REQUIRED
1942 ctx.load_verify_locations(cadata=der)
1943 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1944 s.connect(self.server_addr)
1945 cert = s.getpeercert()
1946 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001947
Antoine Pitroue3220242010-04-24 11:13:53 +00001948 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1949 def test_makefile_close(self):
1950 # Issue #5238: creating a file-like object with makefile() shouldn't
1951 # delay closing the underlying "real socket" (here tested with its
1952 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001953 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001954 ss.connect(self.server_addr)
1955 fd = ss.fileno()
1956 f = ss.makefile()
1957 f.close()
1958 # The fd is still open
1959 os.read(fd, 0)
1960 # Closing the SSL socket should close the fd too
1961 ss.close()
1962 gc.collect()
1963 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001964 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001965 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001966
Antoine Pitrou480a1242010-04-28 21:37:09 +00001967 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001968 s = socket.socket(socket.AF_INET)
1969 s.connect(self.server_addr)
1970 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001971 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001972 cert_reqs=ssl.CERT_NONE,
1973 do_handshake_on_connect=False)
1974 self.addCleanup(s.close)
1975 count = 0
1976 while True:
1977 try:
1978 count += 1
1979 s.do_handshake()
1980 break
1981 except ssl.SSLWantReadError:
1982 select.select([s], [], [])
1983 except ssl.SSLWantWriteError:
1984 select.select([], [s], [])
1985 if support.verbose:
1986 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001987
Antoine Pitrou480a1242010-04-28 21:37:09 +00001988 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001989 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001990
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 def test_get_server_certificate_fail(self):
1992 # Connection failure crashes ThreadedEchoServer, so run this in an
1993 # independent test method
1994 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001995
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001996 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001997 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001998 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1999 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002000 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002001 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2002 s.connect(self.server_addr)
2003 # Error checking can happen at instantiation or when connecting
2004 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2005 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002006 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002007 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2008 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002009
Christian Heimes9a5395a2013-06-17 15:44:12 +02002010 def test_get_ca_certs_capath(self):
2011 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002012 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002013 ctx.load_verify_locations(capath=CAPATH)
2014 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002015 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2016 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002017 s.connect(self.server_addr)
2018 cert = s.getpeercert()
2019 self.assertTrue(cert)
2020 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002021
Christian Heimes575596e2013-12-15 21:49:17 +01002022 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002023 def test_context_setget(self):
2024 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002025 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2026 ctx1.load_verify_locations(capath=CAPATH)
2027 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2028 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002029 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002030 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002031 ss.connect(self.server_addr)
2032 self.assertIs(ss.context, ctx1)
2033 self.assertIs(ss._sslobj.context, ctx1)
2034 ss.context = ctx2
2035 self.assertIs(ss.context, ctx2)
2036 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002037
2038 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2039 # A simple IO loop. Call func(*args) depending on the error we get
2040 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2041 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002042 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002043 count = 0
2044 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002045 if time.monotonic() > deadline:
2046 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002047 errno = None
2048 count += 1
2049 try:
2050 ret = func(*args)
2051 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002052 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002053 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002054 raise
2055 errno = e.errno
2056 # Get any data from the outgoing BIO irrespective of any error, and
2057 # send it to the socket.
2058 buf = outgoing.read()
2059 sock.sendall(buf)
2060 # If there's no error, we're done. For WANT_READ, we need to get
2061 # data from the socket and put it in the incoming BIO.
2062 if errno is None:
2063 break
2064 elif errno == ssl.SSL_ERROR_WANT_READ:
2065 buf = sock.recv(32768)
2066 if buf:
2067 incoming.write(buf)
2068 else:
2069 incoming.write_eof()
2070 if support.verbose:
2071 sys.stdout.write("Needed %d calls to complete %s().\n"
2072 % (count, func.__name__))
2073 return ret
2074
Martin Panter3840b2a2016-03-27 01:53:46 +00002075 def test_bio_handshake(self):
2076 sock = socket.socket(socket.AF_INET)
2077 self.addCleanup(sock.close)
2078 sock.connect(self.server_addr)
2079 incoming = ssl.MemoryBIO()
2080 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002081 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2082 self.assertTrue(ctx.check_hostname)
2083 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002084 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002085 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2086 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002087 self.assertIs(sslobj._sslobj.owner, sslobj)
2088 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002089 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002090 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002091 self.assertRaises(ValueError, sslobj.getpeercert)
2092 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2093 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2094 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2095 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002096 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002097 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002098 self.assertTrue(sslobj.getpeercert())
2099 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2100 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2101 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002102 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002103 except ssl.SSLSyscallError:
2104 # If the server shuts down the TCP connection without sending a
2105 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2106 pass
2107 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2108
2109 def test_bio_read_write_data(self):
2110 sock = socket.socket(socket.AF_INET)
2111 self.addCleanup(sock.close)
2112 sock.connect(self.server_addr)
2113 incoming = ssl.MemoryBIO()
2114 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002115 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002116 ctx.verify_mode = ssl.CERT_NONE
2117 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2118 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2119 req = b'FOO\n'
2120 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2121 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2122 self.assertEqual(buf, b'foo\n')
2123 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002124
2125
Martin Panter3840b2a2016-03-27 01:53:46 +00002126class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002127
Martin Panter3840b2a2016-03-27 01:53:46 +00002128 def test_timeout_connect_ex(self):
2129 # Issue #12065: on a timeout, connect_ex() should return the original
2130 # errno (mimicking the behaviour of non-SSL sockets).
2131 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002132 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002133 cert_reqs=ssl.CERT_REQUIRED,
2134 do_handshake_on_connect=False)
2135 self.addCleanup(s.close)
2136 s.settimeout(0.0000001)
2137 rc = s.connect_ex((REMOTE_HOST, 443))
2138 if rc == 0:
2139 self.skipTest("REMOTE_HOST responded too quickly")
2140 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2141
2142 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2143 def test_get_server_certificate_ipv6(self):
2144 with support.transient_internet('ipv6.google.com'):
2145 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2146 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2147
Martin Panter3840b2a2016-03-27 01:53:46 +00002148
2149def _test_get_server_certificate(test, host, port, cert=None):
2150 pem = ssl.get_server_certificate((host, port))
2151 if not pem:
2152 test.fail("No server certificate on %s:%s!" % (host, port))
2153
2154 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2155 if not pem:
2156 test.fail("No server certificate on %s:%s!" % (host, port))
2157 if support.verbose:
2158 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2159
2160def _test_get_server_certificate_fail(test, host, port):
2161 try:
2162 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2163 except ssl.SSLError as x:
2164 #should fail
2165 if support.verbose:
2166 sys.stdout.write("%s\n" % x)
2167 else:
2168 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2169
2170
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002171from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002173class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002174
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002175 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002176
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002177 """A mildly complicated class, because we want it to work both
2178 with and without the SSL wrapper around the socket connection, so
2179 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002180
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002181 def __init__(self, server, connsock, addr):
2182 self.server = server
2183 self.running = False
2184 self.sock = connsock
2185 self.addr = addr
2186 self.sock.setblocking(1)
2187 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002188 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002189 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002190
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002191 def wrap_conn(self):
2192 try:
2193 self.sslconn = self.server.context.wrap_socket(
2194 self.sock, server_side=True)
2195 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2196 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002197 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002198 # We treat ConnectionResetError as though it were an
2199 # SSLError - OpenSSL on Ubuntu abruptly closes the
2200 # connection when asked to use an unsupported protocol.
2201 #
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002202 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2203 # tries to send session tickets after handshake.
2204 # https://github.com/openssl/openssl/issues/6342
2205 self.server.conn_errors.append(str(e))
2206 if self.server.chatty:
2207 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2208 self.running = False
2209 self.close()
2210 return False
2211 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002212 # OSError may occur with wrong protocols, e.g. both
2213 # sides use PROTOCOL_TLS_SERVER.
2214 #
2215 # XXX Various errors can have happened here, for example
2216 # a mismatching protocol version, an invalid certificate,
2217 # or a low-level bug. This should be made more discriminating.
2218 #
2219 # bpo-31323: Store the exception as string to prevent
2220 # a reference leak: server -> conn_errors -> exception
2221 # -> traceback -> self (ConnectionHandler) -> server
2222 self.server.conn_errors.append(str(e))
2223 if self.server.chatty:
2224 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2225 self.running = False
2226 self.server.stop()
2227 self.close()
2228 return False
2229 else:
2230 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2231 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2232 cert = self.sslconn.getpeercert()
2233 if support.verbose and self.server.chatty:
2234 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2235 cert_binary = self.sslconn.getpeercert(True)
2236 if support.verbose and self.server.chatty:
2237 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2238 cipher = self.sslconn.cipher()
2239 if support.verbose and self.server.chatty:
2240 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2241 sys.stdout.write(" server: selected protocol is now "
2242 + str(self.sslconn.selected_npn_protocol()) + "\n")
2243 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002244
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002245 def read(self):
2246 if self.sslconn:
2247 return self.sslconn.read()
2248 else:
2249 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002250
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002251 def write(self, bytes):
2252 if self.sslconn:
2253 return self.sslconn.write(bytes)
2254 else:
2255 return self.sock.send(bytes)
2256
2257 def close(self):
2258 if self.sslconn:
2259 self.sslconn.close()
2260 else:
2261 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002262
Antoine Pitrou480a1242010-04-28 21:37:09 +00002263 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002264 self.running = True
2265 if not self.server.starttls_server:
2266 if not self.wrap_conn():
2267 return
2268 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002269 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002270 msg = self.read()
2271 stripped = msg.strip()
2272 if not stripped:
2273 # eof, so quit this handler
2274 self.running = False
2275 try:
2276 self.sock = self.sslconn.unwrap()
2277 except OSError:
2278 # Many tests shut the TCP connection down
2279 # without an SSL shutdown. This causes
2280 # unwrap() to raise OSError with errno=0!
2281 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002282 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002283 self.sslconn = None
2284 self.close()
2285 elif stripped == b'over':
2286 if support.verbose and self.server.connectionchatty:
2287 sys.stdout.write(" server: client closed connection\n")
2288 self.close()
2289 return
2290 elif (self.server.starttls_server and
2291 stripped == b'STARTTLS'):
2292 if support.verbose and self.server.connectionchatty:
2293 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2294 self.write(b"OK\n")
2295 if not self.wrap_conn():
2296 return
2297 elif (self.server.starttls_server and self.sslconn
2298 and stripped == b'ENDTLS'):
2299 if support.verbose and self.server.connectionchatty:
2300 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2301 self.write(b"OK\n")
2302 self.sock = self.sslconn.unwrap()
2303 self.sslconn = None
2304 if support.verbose and self.server.connectionchatty:
2305 sys.stdout.write(" server: connection is now unencrypted...\n")
2306 elif stripped == b'CB tls-unique':
2307 if support.verbose and self.server.connectionchatty:
2308 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2309 data = self.sslconn.get_channel_binding("tls-unique")
2310 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes2756ef32018-09-23 09:22:52 +02002311 elif stripped == b'PHA':
2312 if support.verbose and self.server.connectionchatty:
2313 sys.stdout.write(" server: initiating post handshake auth\n")
2314 try:
2315 self.sslconn.verify_client_post_handshake()
2316 except ssl.SSLError as e:
2317 self.write(repr(e).encode("us-ascii") + b"\n")
2318 else:
2319 self.write(b"OK\n")
2320 elif stripped == b'HASCERT':
2321 if self.sslconn.getpeercert() is not None:
2322 self.write(b'TRUE\n')
2323 else:
2324 self.write(b'FALSE\n')
2325 elif stripped == b'GETCERT':
2326 cert = self.sslconn.getpeercert()
2327 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002328 else:
2329 if (support.verbose and
2330 self.server.connectionchatty):
2331 ctype = (self.sslconn and "encrypted") or "unencrypted"
2332 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2333 % (msg, ctype, msg.lower(), ctype))
2334 self.write(msg.lower())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002335 except ConnectionResetError:
2336 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2337 # when connection is not shut down gracefully.
2338 if self.server.chatty and support.verbose:
2339 sys.stdout.write(
2340 " Connection reset by peer: {}\n".format(
2341 self.addr)
2342 )
2343 self.close()
2344 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002345 except OSError:
2346 if self.server.chatty:
2347 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002348 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002349 self.running = False
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002350
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002351 # normally, we'd just stop here, but for the test
2352 # harness, we want to stop the server
2353 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002354
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002355 def __init__(self, certificate=None, ssl_version=None,
2356 certreqs=None, cacerts=None,
2357 chatty=True, connectionchatty=False, starttls_server=False,
2358 npn_protocols=None, alpn_protocols=None,
2359 ciphers=None, context=None):
2360 if context:
2361 self.context = context
2362 else:
2363 self.context = ssl.SSLContext(ssl_version
2364 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002365 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002366 self.context.verify_mode = (certreqs if certreqs is not None
2367 else ssl.CERT_NONE)
2368 if cacerts:
2369 self.context.load_verify_locations(cacerts)
2370 if certificate:
2371 self.context.load_cert_chain(certificate)
2372 if npn_protocols:
2373 self.context.set_npn_protocols(npn_protocols)
2374 if alpn_protocols:
2375 self.context.set_alpn_protocols(alpn_protocols)
2376 if ciphers:
2377 self.context.set_ciphers(ciphers)
2378 self.chatty = chatty
2379 self.connectionchatty = connectionchatty
2380 self.starttls_server = starttls_server
2381 self.sock = socket.socket()
2382 self.port = support.bind_port(self.sock)
2383 self.flag = None
2384 self.active = False
2385 self.selected_npn_protocols = []
2386 self.selected_alpn_protocols = []
2387 self.shared_ciphers = []
2388 self.conn_errors = []
2389 threading.Thread.__init__(self)
2390 self.daemon = True
2391
2392 def __enter__(self):
2393 self.start(threading.Event())
2394 self.flag.wait()
2395 return self
2396
2397 def __exit__(self, *args):
2398 self.stop()
2399 self.join()
2400
2401 def start(self, flag=None):
2402 self.flag = flag
2403 threading.Thread.start(self)
2404
2405 def run(self):
2406 self.sock.settimeout(0.05)
2407 self.sock.listen()
2408 self.active = True
2409 if self.flag:
2410 # signal an event
2411 self.flag.set()
2412 while self.active:
2413 try:
2414 newconn, connaddr = self.sock.accept()
2415 if support.verbose and self.chatty:
2416 sys.stdout.write(' server: new connection from '
2417 + repr(connaddr) + '\n')
2418 handler = self.ConnectionHandler(self, newconn, connaddr)
2419 handler.start()
2420 handler.join()
2421 except socket.timeout:
2422 pass
2423 except KeyboardInterrupt:
2424 self.stop()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002425 except BaseException as e:
2426 if support.verbose and self.chatty:
2427 sys.stdout.write(
2428 ' connection handling failed: ' + repr(e) + '\n')
2429
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002430 self.sock.close()
2431
2432 def stop(self):
2433 self.active = False
2434
2435class AsyncoreEchoServer(threading.Thread):
2436
2437 # this one's based on asyncore.dispatcher
2438
2439 class EchoServer (asyncore.dispatcher):
2440
2441 class ConnectionHandler(asyncore.dispatcher_with_send):
2442
2443 def __init__(self, conn, certfile):
2444 self.socket = test_wrap_socket(conn, server_side=True,
2445 certfile=certfile,
2446 do_handshake_on_connect=False)
2447 asyncore.dispatcher_with_send.__init__(self, self.socket)
2448 self._ssl_accepting = True
2449 self._do_ssl_handshake()
2450
2451 def readable(self):
2452 if isinstance(self.socket, ssl.SSLSocket):
2453 while self.socket.pending() > 0:
2454 self.handle_read_event()
2455 return True
2456
2457 def _do_ssl_handshake(self):
2458 try:
2459 self.socket.do_handshake()
2460 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2461 return
2462 except ssl.SSLEOFError:
2463 return self.handle_close()
2464 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002465 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002466 except OSError as err:
2467 if err.args[0] == errno.ECONNABORTED:
2468 return self.handle_close()
2469 else:
2470 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002471
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002472 def handle_read(self):
2473 if self._ssl_accepting:
2474 self._do_ssl_handshake()
2475 else:
2476 data = self.recv(1024)
2477 if support.verbose:
2478 sys.stdout.write(" server: read %s from client\n" % repr(data))
2479 if not data:
2480 self.close()
2481 else:
2482 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002483
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002484 def handle_close(self):
2485 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002486 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002487 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002488
2489 def handle_error(self):
2490 raise
2491
Trent Nelson78520002008-04-10 20:54:35 +00002492 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002493 self.certfile = certfile
2494 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2495 self.port = support.bind_port(sock, '')
2496 asyncore.dispatcher.__init__(self, sock)
2497 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002498
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002499 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002500 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002501 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2502 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002503
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002504 def handle_error(self):
2505 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002506
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002507 def __init__(self, certfile):
2508 self.flag = None
2509 self.active = False
2510 self.server = self.EchoServer(certfile)
2511 self.port = self.server.port
2512 threading.Thread.__init__(self)
2513 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002514
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002515 def __str__(self):
2516 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002517
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002518 def __enter__(self):
2519 self.start(threading.Event())
2520 self.flag.wait()
2521 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002522
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002523 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002524 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002525 sys.stdout.write(" cleanup: stopping server.\n")
2526 self.stop()
2527 if support.verbose:
2528 sys.stdout.write(" cleanup: joining server thread.\n")
2529 self.join()
2530 if support.verbose:
2531 sys.stdout.write(" cleanup: successfully joined.\n")
2532 # make sure that ConnectionHandler is removed from socket_map
2533 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002534
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002535 def start (self, flag=None):
2536 self.flag = flag
2537 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002538
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002539 def run(self):
2540 self.active = True
2541 if self.flag:
2542 self.flag.set()
2543 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002544 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002545 asyncore.loop(1)
2546 except:
2547 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002548
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002549 def stop(self):
2550 self.active = False
2551 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002552
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002553def server_params_test(client_context, server_context, indata=b"FOO\n",
2554 chatty=True, connectionchatty=False, sni_name=None,
2555 session=None):
2556 """
2557 Launch a server, connect a client to it and try various reads
2558 and writes.
2559 """
2560 stats = {}
2561 server = ThreadedEchoServer(context=server_context,
2562 chatty=chatty,
2563 connectionchatty=False)
2564 with server:
2565 with client_context.wrap_socket(socket.socket(),
2566 server_hostname=sni_name, session=session) as s:
2567 s.connect((HOST, server.port))
2568 for arg in [indata, bytearray(indata), memoryview(indata)]:
2569 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002570 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002571 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002572 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002573 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002574 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002575 if connectionchatty:
2576 if support.verbose:
2577 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002578 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002579 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002580 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2581 % (outdata[:20], len(outdata),
2582 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002583 s.write(b"over\n")
2584 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002585 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002586 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002587 stats.update({
2588 'compression': s.compression(),
2589 'cipher': s.cipher(),
2590 'peercert': s.getpeercert(),
2591 'client_alpn_protocol': s.selected_alpn_protocol(),
2592 'client_npn_protocol': s.selected_npn_protocol(),
2593 'version': s.version(),
2594 'session_reused': s.session_reused,
2595 'session': s.session,
2596 })
2597 s.close()
2598 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2599 stats['server_npn_protocols'] = server.selected_npn_protocols
2600 stats['server_shared_ciphers'] = server.shared_ciphers
2601 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002602
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002603def try_protocol_combo(server_protocol, client_protocol, expect_success,
2604 certsreqs=None, server_options=0, client_options=0):
2605 """
2606 Try to SSL-connect using *client_protocol* to *server_protocol*.
2607 If *expect_success* is true, assert that the connection succeeds,
2608 if it's false, assert that the connection fails.
2609 Also, if *expect_success* is a string, assert that it is the protocol
2610 version actually used by the connection.
2611 """
2612 if certsreqs is None:
2613 certsreqs = ssl.CERT_NONE
2614 certtype = {
2615 ssl.CERT_NONE: "CERT_NONE",
2616 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2617 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2618 }[certsreqs]
2619 if support.verbose:
2620 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2621 sys.stdout.write(formatstr %
2622 (ssl.get_protocol_name(client_protocol),
2623 ssl.get_protocol_name(server_protocol),
2624 certtype))
2625 client_context = ssl.SSLContext(client_protocol)
2626 client_context.options |= client_options
2627 server_context = ssl.SSLContext(server_protocol)
2628 server_context.options |= server_options
2629
2630 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2631 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2632 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002633 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002634 client_context.set_ciphers("ALL")
2635
2636 for ctx in (client_context, server_context):
2637 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002638 ctx.load_cert_chain(SIGNED_CERTFILE)
2639 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640 try:
2641 stats = server_params_test(client_context, server_context,
2642 chatty=False, connectionchatty=False)
2643 # Protocol mismatch can result in either an SSLError, or a
2644 # "Connection reset by peer" error.
2645 except ssl.SSLError:
2646 if expect_success:
2647 raise
2648 except OSError as e:
2649 if expect_success or e.errno != errno.ECONNRESET:
2650 raise
2651 else:
2652 if not expect_success:
2653 raise AssertionError(
2654 "Client protocol %s succeeded with server protocol %s!"
2655 % (ssl.get_protocol_name(client_protocol),
2656 ssl.get_protocol_name(server_protocol)))
2657 elif (expect_success is not True
2658 and expect_success != stats['version']):
2659 raise AssertionError("version mismatch: expected %r, got %r"
2660 % (expect_success, stats['version']))
2661
2662
2663class ThreadedTests(unittest.TestCase):
2664
2665 @skip_if_broken_ubuntu_ssl
2666 def test_echo(self):
2667 """Basic test of an SSL client connecting to a server"""
2668 if support.verbose:
2669 sys.stdout.write("\n")
2670 for protocol in PROTOCOLS:
2671 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2672 continue
2673 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2674 context = ssl.SSLContext(protocol)
2675 context.load_cert_chain(CERTFILE)
2676 server_params_test(context, context,
2677 chatty=True, connectionchatty=True)
2678
Christian Heimesa170fa12017-09-15 20:27:30 +02002679 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002680
2681 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2682 server_params_test(client_context=client_context,
2683 server_context=server_context,
2684 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002685 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002686
2687 client_context.check_hostname = False
2688 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2689 with self.assertRaises(ssl.SSLError) as e:
2690 server_params_test(client_context=server_context,
2691 server_context=client_context,
2692 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002693 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002694 self.assertIn('called a function you should not call',
2695 str(e.exception))
2696
2697 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2698 with self.assertRaises(ssl.SSLError) as e:
2699 server_params_test(client_context=server_context,
2700 server_context=server_context,
2701 chatty=True, connectionchatty=True)
2702 self.assertIn('called a function you should not call',
2703 str(e.exception))
2704
2705 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2706 with self.assertRaises(ssl.SSLError) as e:
2707 server_params_test(client_context=server_context,
2708 server_context=client_context,
2709 chatty=True, connectionchatty=True)
2710 self.assertIn('called a function you should not call',
2711 str(e.exception))
2712
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713 def test_getpeercert(self):
2714 if support.verbose:
2715 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002716
2717 client_context, server_context, hostname = testing_context()
2718 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002719 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002720 with client_context.wrap_socket(socket.socket(),
2721 do_handshake_on_connect=False,
2722 server_hostname=hostname) as s:
2723 s.connect((HOST, server.port))
2724 # getpeercert() raise ValueError while the handshake isn't
2725 # done.
2726 with self.assertRaises(ValueError):
2727 s.getpeercert()
2728 s.do_handshake()
2729 cert = s.getpeercert()
2730 self.assertTrue(cert, "Can't get peer certificate.")
2731 cipher = s.cipher()
2732 if support.verbose:
2733 sys.stdout.write(pprint.pformat(cert) + '\n')
2734 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2735 if 'subject' not in cert:
2736 self.fail("No subject field in certificate: %s." %
2737 pprint.pformat(cert))
2738 if ((('organizationName', 'Python Software Foundation'),)
2739 not in cert['subject']):
2740 self.fail(
2741 "Missing or invalid 'organizationName' field in certificate subject; "
2742 "should be 'Python Software Foundation'.")
2743 self.assertIn('notBefore', cert)
2744 self.assertIn('notAfter', cert)
2745 before = ssl.cert_time_to_seconds(cert['notBefore'])
2746 after = ssl.cert_time_to_seconds(cert['notAfter'])
2747 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002748
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002749 @unittest.skipUnless(have_verify_flags(),
2750 "verify_flags need OpenSSL > 0.9.8")
2751 def test_crl_check(self):
2752 if support.verbose:
2753 sys.stdout.write("\n")
2754
Christian Heimesa170fa12017-09-15 20:27:30 +02002755 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002756
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002757 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002758 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002759
2760 # VERIFY_DEFAULT should pass
2761 server = ThreadedEchoServer(context=server_context, chatty=True)
2762 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002763 with client_context.wrap_socket(socket.socket(),
2764 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002765 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002766 cert = s.getpeercert()
2767 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002768
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002769 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002770 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002771
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002772 server = ThreadedEchoServer(context=server_context, chatty=True)
2773 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002774 with client_context.wrap_socket(socket.socket(),
2775 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002776 with self.assertRaisesRegex(ssl.SSLError,
2777 "certificate verify failed"):
2778 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002779
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002781 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002782
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002783 server = ThreadedEchoServer(context=server_context, chatty=True)
2784 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002785 with client_context.wrap_socket(socket.socket(),
2786 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002787 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002788 cert = s.getpeercert()
2789 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002790
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002791 def test_check_hostname(self):
2792 if support.verbose:
2793 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002794
Christian Heimesa170fa12017-09-15 20:27:30 +02002795 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002796
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002797 # correct hostname should verify
2798 server = ThreadedEchoServer(context=server_context, chatty=True)
2799 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002800 with client_context.wrap_socket(socket.socket(),
2801 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002802 s.connect((HOST, server.port))
2803 cert = s.getpeercert()
2804 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002805
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002806 # incorrect hostname should raise an exception
2807 server = ThreadedEchoServer(context=server_context, chatty=True)
2808 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002809 with client_context.wrap_socket(socket.socket(),
2810 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002811 with self.assertRaisesRegex(
2812 ssl.CertificateError,
2813 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002814 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002815
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002816 # missing server_hostname arg should cause an exception, too
2817 server = ThreadedEchoServer(context=server_context, chatty=True)
2818 with server:
2819 with socket.socket() as s:
2820 with self.assertRaisesRegex(ValueError,
2821 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002822 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002823
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002824 def test_ecc_cert(self):
2825 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2826 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002827 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002828 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2829
2830 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2831 # load ECC cert
2832 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2833
2834 # correct hostname should verify
2835 server = ThreadedEchoServer(context=server_context, chatty=True)
2836 with server:
2837 with client_context.wrap_socket(socket.socket(),
2838 server_hostname=hostname) as s:
2839 s.connect((HOST, server.port))
2840 cert = s.getpeercert()
2841 self.assertTrue(cert, "Can't get peer certificate.")
2842 cipher = s.cipher()[0].split('-')
2843 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2844
2845 def test_dual_rsa_ecc(self):
2846 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2847 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002848 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2849 # algorithms.
2850 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002851 # only ECDSA certs
2852 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2853 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2854
2855 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2856 # load ECC and RSA key/cert pairs
2857 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2858 server_context.load_cert_chain(SIGNED_CERTFILE)
2859
2860 # correct hostname should verify
2861 server = ThreadedEchoServer(context=server_context, chatty=True)
2862 with server:
2863 with client_context.wrap_socket(socket.socket(),
2864 server_hostname=hostname) as s:
2865 s.connect((HOST, server.port))
2866 cert = s.getpeercert()
2867 self.assertTrue(cert, "Can't get peer certificate.")
2868 cipher = s.cipher()[0].split('-')
2869 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2870
Christian Heimes66e57422018-01-29 14:25:13 +01002871 def test_check_hostname_idn(self):
2872 if support.verbose:
2873 sys.stdout.write("\n")
2874
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002875 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002876 server_context.load_cert_chain(IDNSANSFILE)
2877
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002878 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002879 context.verify_mode = ssl.CERT_REQUIRED
2880 context.check_hostname = True
2881 context.load_verify_locations(SIGNING_CA)
2882
2883 # correct hostname should verify, when specified in several
2884 # different ways
2885 idn_hostnames = [
2886 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002887 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002888 ('xn--knig-5qa.idn.pythontest.net',
2889 'xn--knig-5qa.idn.pythontest.net'),
2890 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002891 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002892
2893 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002894 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002895 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2896 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2897 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002898 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2899
2900 # ('königsgäßchen.idna2008.pythontest.net',
2901 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2902 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2903 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2904 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2905 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2906
Christian Heimes66e57422018-01-29 14:25:13 +01002907 ]
2908 for server_hostname, expected_hostname in idn_hostnames:
2909 server = ThreadedEchoServer(context=server_context, chatty=True)
2910 with server:
2911 with context.wrap_socket(socket.socket(),
2912 server_hostname=server_hostname) as s:
2913 self.assertEqual(s.server_hostname, expected_hostname)
2914 s.connect((HOST, server.port))
2915 cert = s.getpeercert()
2916 self.assertEqual(s.server_hostname, expected_hostname)
2917 self.assertTrue(cert, "Can't get peer certificate.")
2918
Christian Heimes66e57422018-01-29 14:25:13 +01002919 # incorrect hostname should raise an exception
2920 server = ThreadedEchoServer(context=server_context, chatty=True)
2921 with server:
2922 with context.wrap_socket(socket.socket(),
2923 server_hostname="python.example.org") as s:
2924 with self.assertRaises(ssl.CertificateError):
2925 s.connect((HOST, server.port))
2926
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002927 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002928 """Connecting when the server rejects the client's certificate
2929
2930 Launch a server with CERT_REQUIRED, and check that trying to
2931 connect to it with a wrong client certificate fails.
2932 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002933 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002934 # load client cert that is not signed by trusted CA
2935 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002936 # require TLS client authentication
2937 server_context.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002938 # TLS 1.3 has different handshake
2939 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002940
2941 server = ThreadedEchoServer(
2942 context=server_context, chatty=True, connectionchatty=True,
2943 )
2944
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002945 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002946 client_context.wrap_socket(socket.socket(),
2947 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002948 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002949 # Expect either an SSL error about the server rejecting
2950 # the connection, or a low-level connection reset (which
2951 # sometimes happens on Windows)
2952 s.connect((HOST, server.port))
2953 except ssl.SSLError as e:
2954 if support.verbose:
2955 sys.stdout.write("\nSSLError is %r\n" % e)
2956 except OSError as e:
2957 if e.errno != errno.ECONNRESET:
2958 raise
2959 if support.verbose:
2960 sys.stdout.write("\nsocket.error is %r\n" % e)
2961 else:
2962 self.fail("Use of invalid cert should have failed!")
2963
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002964 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2965 def test_wrong_cert_tls13(self):
2966 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002967 # load client cert that is not signed by trusted CA
2968 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002969 server_context.verify_mode = ssl.CERT_REQUIRED
2970 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2971 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2972
2973 server = ThreadedEchoServer(
2974 context=server_context, chatty=True, connectionchatty=True,
2975 )
2976 with server, \
2977 client_context.wrap_socket(socket.socket(),
2978 server_hostname=hostname) as s:
2979 # TLS 1.3 perform client cert exchange after handshake
2980 s.connect((HOST, server.port))
2981 try:
2982 s.write(b'data')
2983 s.read(4)
2984 except ssl.SSLError as e:
2985 if support.verbose:
2986 sys.stdout.write("\nSSLError is %r\n" % e)
2987 except OSError as e:
2988 if e.errno != errno.ECONNRESET:
2989 raise
2990 if support.verbose:
2991 sys.stdout.write("\nsocket.error is %r\n" % e)
2992 else:
2993 self.fail("Use of invalid cert should have failed!")
2994
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002995 def test_rude_shutdown(self):
2996 """A brutal shutdown of an SSL server should raise an OSError
2997 in the client when attempting handshake.
2998 """
2999 listener_ready = threading.Event()
3000 listener_gone = threading.Event()
3001
3002 s = socket.socket()
3003 port = support.bind_port(s, HOST)
3004
3005 # `listener` runs in a thread. It sits in an accept() until
3006 # the main thread connects. Then it rudely closes the socket,
3007 # and sets Event `listener_gone` to let the main thread know
3008 # the socket is gone.
3009 def listener():
3010 s.listen()
3011 listener_ready.set()
3012 newsock, addr = s.accept()
3013 newsock.close()
3014 s.close()
3015 listener_gone.set()
3016
3017 def connector():
3018 listener_ready.wait()
3019 with socket.socket() as c:
3020 c.connect((HOST, port))
3021 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003022 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003023 ssl_sock = test_wrap_socket(c)
3024 except OSError:
3025 pass
3026 else:
3027 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003028
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003029 t = threading.Thread(target=listener)
3030 t.start()
3031 try:
3032 connector()
3033 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003034 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003035
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003036 def test_ssl_cert_verify_error(self):
3037 if support.verbose:
3038 sys.stdout.write("\n")
3039
3040 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3041 server_context.load_cert_chain(SIGNED_CERTFILE)
3042
3043 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3044
3045 server = ThreadedEchoServer(context=server_context, chatty=True)
3046 with server:
3047 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003048 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003049 try:
3050 s.connect((HOST, server.port))
3051 except ssl.SSLError as e:
3052 msg = 'unable to get local issuer certificate'
3053 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3054 self.assertEqual(e.verify_code, 20)
3055 self.assertEqual(e.verify_message, msg)
3056 self.assertIn(msg, repr(e))
3057 self.assertIn('certificate verify failed', repr(e))
3058
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003059 @skip_if_broken_ubuntu_ssl
3060 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3061 "OpenSSL is compiled without SSLv2 support")
3062 def test_protocol_sslv2(self):
3063 """Connecting to an SSLv2 server with various client options"""
3064 if support.verbose:
3065 sys.stdout.write("\n")
3066 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3067 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3068 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003069 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3071 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3072 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3073 # SSLv23 client with specific SSL options
3074 if no_sslv2_implies_sslv3_hello():
3075 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003076 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003077 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003078 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003079 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003080 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003081 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003082
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003083 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02003084 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085 """Connecting to an SSLv23 server with various client options"""
3086 if support.verbose:
3087 sys.stdout.write("\n")
3088 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003089 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003090 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003091 except OSError as x:
3092 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3093 if support.verbose:
3094 sys.stdout.write(
3095 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3096 % str(x))
3097 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003098 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3099 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3100 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003101
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003102 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003103 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3104 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3105 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003106
3107 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003108 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3109 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3110 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003111
3112 # Server with specific SSL options
3113 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003114 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003115 server_options=ssl.OP_NO_SSLv3)
3116 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003117 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003118 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003119 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003120 server_options=ssl.OP_NO_TLSv1)
3121
3122
3123 @skip_if_broken_ubuntu_ssl
3124 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3125 "OpenSSL is compiled without SSLv3 support")
3126 def test_protocol_sslv3(self):
3127 """Connecting to an SSLv3 server with various client options"""
3128 if support.verbose:
3129 sys.stdout.write("\n")
3130 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3131 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3132 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3133 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3134 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003135 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003136 client_options=ssl.OP_NO_SSLv3)
3137 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3138 if no_sslv2_implies_sslv3_hello():
3139 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003140 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003141 False, client_options=ssl.OP_NO_SSLv2)
3142
3143 @skip_if_broken_ubuntu_ssl
3144 def test_protocol_tlsv1(self):
3145 """Connecting to a TLSv1 server with various client options"""
3146 if support.verbose:
3147 sys.stdout.write("\n")
3148 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3149 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3150 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3151 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3152 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3153 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3154 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003155 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003156 client_options=ssl.OP_NO_TLSv1)
3157
3158 @skip_if_broken_ubuntu_ssl
3159 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3160 "TLS version 1.1 not supported.")
3161 def test_protocol_tlsv1_1(self):
3162 """Connecting to a TLSv1.1 server with various client options.
3163 Testing against older TLS versions."""
3164 if support.verbose:
3165 sys.stdout.write("\n")
3166 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3167 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3168 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3169 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3170 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003171 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003172 client_options=ssl.OP_NO_TLSv1_1)
3173
Christian Heimesa170fa12017-09-15 20:27:30 +02003174 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003175 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3176 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3177
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003178 @skip_if_broken_ubuntu_ssl
3179 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3180 "TLS version 1.2 not supported.")
3181 def test_protocol_tlsv1_2(self):
3182 """Connecting to a TLSv1.2 server with various client options.
3183 Testing against older TLS versions."""
3184 if support.verbose:
3185 sys.stdout.write("\n")
3186 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3187 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3188 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3189 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3190 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3191 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3192 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003193 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003194 client_options=ssl.OP_NO_TLSv1_2)
3195
Christian Heimesa170fa12017-09-15 20:27:30 +02003196 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003197 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3198 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3199 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3200 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3201
3202 def test_starttls(self):
3203 """Switching from clear text to encrypted and back again."""
3204 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3205
3206 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003207 starttls_server=True,
3208 chatty=True,
3209 connectionchatty=True)
3210 wrapped = False
3211 with server:
3212 s = socket.socket()
3213 s.setblocking(1)
3214 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003215 if support.verbose:
3216 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003217 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003218 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 sys.stdout.write(
3220 " client: sending %r...\n" % indata)
3221 if wrapped:
3222 conn.write(indata)
3223 outdata = conn.read()
3224 else:
3225 s.send(indata)
3226 outdata = s.recv(1024)
3227 msg = outdata.strip().lower()
3228 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3229 # STARTTLS ok, switch to secure mode
3230 if support.verbose:
3231 sys.stdout.write(
3232 " client: read %r from server, starting TLS...\n"
3233 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003234 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003235 wrapped = True
3236 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3237 # ENDTLS ok, switch back to clear text
3238 if support.verbose:
3239 sys.stdout.write(
3240 " client: read %r from server, ending TLS...\n"
3241 % msg)
3242 s = conn.unwrap()
3243 wrapped = False
3244 else:
3245 if support.verbose:
3246 sys.stdout.write(
3247 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003248 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003249 sys.stdout.write(" client: closing connection.\n")
3250 if wrapped:
3251 conn.write(b"over\n")
3252 else:
3253 s.send(b"over\n")
3254 if wrapped:
3255 conn.close()
3256 else:
3257 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003258
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003259 def test_socketserver(self):
3260 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003261 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003262 # try to connect
3263 if support.verbose:
3264 sys.stdout.write('\n')
3265 with open(CERTFILE, 'rb') as f:
3266 d1 = f.read()
3267 d2 = ''
3268 # now fetch the same data from the HTTPS server
3269 url = 'https://localhost:%d/%s' % (
3270 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003271 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003272 f = urllib.request.urlopen(url, context=context)
3273 try:
3274 dlen = f.info().get("content-length")
3275 if dlen and (int(dlen) > 0):
3276 d2 = f.read(int(dlen))
3277 if support.verbose:
3278 sys.stdout.write(
3279 " client: read %d bytes from remote server '%s'\n"
3280 % (len(d2), server))
3281 finally:
3282 f.close()
3283 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003284
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003285 def test_asyncore_server(self):
3286 """Check the example asyncore integration."""
3287 if support.verbose:
3288 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003289
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003290 indata = b"FOO\n"
3291 server = AsyncoreEchoServer(CERTFILE)
3292 with server:
3293 s = test_wrap_socket(socket.socket())
3294 s.connect(('127.0.0.1', server.port))
3295 if support.verbose:
3296 sys.stdout.write(
3297 " client: sending %r...\n" % indata)
3298 s.write(indata)
3299 outdata = s.read()
3300 if support.verbose:
3301 sys.stdout.write(" client: read %r\n" % outdata)
3302 if outdata != indata.lower():
3303 self.fail(
3304 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3305 % (outdata[:20], len(outdata),
3306 indata[:20].lower(), len(indata)))
3307 s.write(b"over\n")
3308 if support.verbose:
3309 sys.stdout.write(" client: closing connection.\n")
3310 s.close()
3311 if support.verbose:
3312 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003313
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003314 def test_recv_send(self):
3315 """Test recv(), send() and friends."""
3316 if support.verbose:
3317 sys.stdout.write("\n")
3318
3319 server = ThreadedEchoServer(CERTFILE,
3320 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003321 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003322 cacerts=CERTFILE,
3323 chatty=True,
3324 connectionchatty=False)
3325 with server:
3326 s = test_wrap_socket(socket.socket(),
3327 server_side=False,
3328 certfile=CERTFILE,
3329 ca_certs=CERTFILE,
3330 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003331 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003332 s.connect((HOST, server.port))
3333 # helper methods for standardising recv* method signatures
3334 def _recv_into():
3335 b = bytearray(b"\0"*100)
3336 count = s.recv_into(b)
3337 return b[:count]
3338
3339 def _recvfrom_into():
3340 b = bytearray(b"\0"*100)
3341 count, addr = s.recvfrom_into(b)
3342 return b[:count]
3343
3344 # (name, method, expect success?, *args, return value func)
3345 send_methods = [
3346 ('send', s.send, True, [], len),
3347 ('sendto', s.sendto, False, ["some.address"], len),
3348 ('sendall', s.sendall, True, [], lambda x: None),
3349 ]
3350 # (name, method, whether to expect success, *args)
3351 recv_methods = [
3352 ('recv', s.recv, True, []),
3353 ('recvfrom', s.recvfrom, False, ["some.address"]),
3354 ('recv_into', _recv_into, True, []),
3355 ('recvfrom_into', _recvfrom_into, False, []),
3356 ]
3357 data_prefix = "PREFIX_"
3358
3359 for (meth_name, send_meth, expect_success, args,
3360 ret_val_meth) in send_methods:
3361 indata = (data_prefix + meth_name).encode('ascii')
3362 try:
3363 ret = send_meth(indata, *args)
3364 msg = "sending with {}".format(meth_name)
3365 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3366 outdata = s.read()
3367 if outdata != indata.lower():
3368 self.fail(
3369 "While sending with <<{name:s}>> bad data "
3370 "<<{outdata:r}>> ({nout:d}) received; "
3371 "expected <<{indata:r}>> ({nin:d})\n".format(
3372 name=meth_name, outdata=outdata[:20],
3373 nout=len(outdata),
3374 indata=indata[:20], nin=len(indata)
3375 )
3376 )
3377 except ValueError as e:
3378 if expect_success:
3379 self.fail(
3380 "Failed to send with method <<{name:s}>>; "
3381 "expected to succeed.\n".format(name=meth_name)
3382 )
3383 if not str(e).startswith(meth_name):
3384 self.fail(
3385 "Method <<{name:s}>> failed with unexpected "
3386 "exception message: {exp:s}\n".format(
3387 name=meth_name, exp=e
3388 )
3389 )
3390
3391 for meth_name, recv_meth, expect_success, args in recv_methods:
3392 indata = (data_prefix + meth_name).encode('ascii')
3393 try:
3394 s.send(indata)
3395 outdata = recv_meth(*args)
3396 if outdata != indata.lower():
3397 self.fail(
3398 "While receiving with <<{name:s}>> bad data "
3399 "<<{outdata:r}>> ({nout:d}) received; "
3400 "expected <<{indata:r}>> ({nin:d})\n".format(
3401 name=meth_name, outdata=outdata[:20],
3402 nout=len(outdata),
3403 indata=indata[:20], nin=len(indata)
3404 )
3405 )
3406 except ValueError as e:
3407 if expect_success:
3408 self.fail(
3409 "Failed to receive with method <<{name:s}>>; "
3410 "expected to succeed.\n".format(name=meth_name)
3411 )
3412 if not str(e).startswith(meth_name):
3413 self.fail(
3414 "Method <<{name:s}>> failed with unexpected "
3415 "exception message: {exp:s}\n".format(
3416 name=meth_name, exp=e
3417 )
3418 )
3419 # consume data
3420 s.read()
3421
3422 # read(-1, buffer) is supported, even though read(-1) is not
3423 data = b"data"
3424 s.send(data)
3425 buffer = bytearray(len(data))
3426 self.assertEqual(s.read(-1, buffer), len(data))
3427 self.assertEqual(buffer, data)
3428
Christian Heimes888bbdc2017-09-07 14:18:21 -07003429 # sendall accepts bytes-like objects
3430 if ctypes is not None:
3431 ubyte = ctypes.c_ubyte * len(data)
3432 byteslike = ubyte.from_buffer_copy(data)
3433 s.sendall(byteslike)
3434 self.assertEqual(s.read(), data)
3435
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003436 # Make sure sendmsg et al are disallowed to avoid
3437 # inadvertent disclosure of data and/or corruption
3438 # of the encrypted data stream
Miss Islington (bot)6485aa62018-12-06 12:52:43 -08003439 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003440 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3441 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3442 self.assertRaises(NotImplementedError,
Miss Islington (bot)6485aa62018-12-06 12:52:43 -08003443 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003444 s.write(b"over\n")
3445
3446 self.assertRaises(ValueError, s.recv, -1)
3447 self.assertRaises(ValueError, s.read, -1)
3448
3449 s.close()
3450
3451 def test_recv_zero(self):
3452 server = ThreadedEchoServer(CERTFILE)
3453 server.__enter__()
3454 self.addCleanup(server.__exit__, None, None)
3455 s = socket.create_connection((HOST, server.port))
3456 self.addCleanup(s.close)
3457 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3458 self.addCleanup(s.close)
3459
3460 # recv/read(0) should return no data
3461 s.send(b"data")
3462 self.assertEqual(s.recv(0), b"")
3463 self.assertEqual(s.read(0), b"")
3464 self.assertEqual(s.read(), b"data")
3465
3466 # Should not block if the other end sends no data
3467 s.setblocking(False)
3468 self.assertEqual(s.recv(0), b"")
3469 self.assertEqual(s.recv_into(bytearray()), 0)
3470
3471 def test_nonblocking_send(self):
3472 server = ThreadedEchoServer(CERTFILE,
3473 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003474 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003475 cacerts=CERTFILE,
3476 chatty=True,
3477 connectionchatty=False)
3478 with server:
3479 s = test_wrap_socket(socket.socket(),
3480 server_side=False,
3481 certfile=CERTFILE,
3482 ca_certs=CERTFILE,
3483 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003484 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003485 s.connect((HOST, server.port))
3486 s.setblocking(False)
3487
3488 # If we keep sending data, at some point the buffers
3489 # will be full and the call will block
3490 buf = bytearray(8192)
3491 def fill_buffer():
3492 while True:
3493 s.send(buf)
3494 self.assertRaises((ssl.SSLWantWriteError,
3495 ssl.SSLWantReadError), fill_buffer)
3496
3497 # Now read all the output and discard it
3498 s.setblocking(True)
3499 s.close()
3500
3501 def test_handshake_timeout(self):
3502 # Issue #5103: SSL handshake must respect the socket timeout
3503 server = socket.socket(socket.AF_INET)
3504 host = "127.0.0.1"
3505 port = support.bind_port(server)
3506 started = threading.Event()
3507 finish = False
3508
3509 def serve():
3510 server.listen()
3511 started.set()
3512 conns = []
3513 while not finish:
3514 r, w, e = select.select([server], [], [], 0.1)
3515 if server in r:
3516 # Let the socket hang around rather than having
3517 # it closed by garbage collection.
3518 conns.append(server.accept()[0])
3519 for sock in conns:
3520 sock.close()
3521
3522 t = threading.Thread(target=serve)
3523 t.start()
3524 started.wait()
3525
3526 try:
3527 try:
3528 c = socket.socket(socket.AF_INET)
3529 c.settimeout(0.2)
3530 c.connect((host, port))
3531 # Will attempt handshake and time out
3532 self.assertRaisesRegex(socket.timeout, "timed out",
3533 test_wrap_socket, c)
3534 finally:
3535 c.close()
3536 try:
3537 c = socket.socket(socket.AF_INET)
3538 c = test_wrap_socket(c)
3539 c.settimeout(0.2)
3540 # Will attempt handshake and time out
3541 self.assertRaisesRegex(socket.timeout, "timed out",
3542 c.connect, (host, port))
3543 finally:
3544 c.close()
3545 finally:
3546 finish = True
3547 t.join()
3548 server.close()
3549
3550 def test_server_accept(self):
3551 # Issue #16357: accept() on a SSLSocket created through
3552 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003553 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003554 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003555 context.load_verify_locations(SIGNING_CA)
3556 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003557 server = socket.socket(socket.AF_INET)
3558 host = "127.0.0.1"
3559 port = support.bind_port(server)
3560 server = context.wrap_socket(server, server_side=True)
3561 self.assertTrue(server.server_side)
3562
3563 evt = threading.Event()
3564 remote = None
3565 peer = None
3566 def serve():
3567 nonlocal remote, peer
3568 server.listen()
3569 # Block on the accept and wait on the connection to close.
3570 evt.set()
3571 remote, peer = server.accept()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003572 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003573
3574 t = threading.Thread(target=serve)
3575 t.start()
3576 # Client wait until server setup and perform a connect.
3577 evt.wait()
3578 client = context.wrap_socket(socket.socket())
3579 client.connect((host, port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003580 client.send(b'data')
3581 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003582 client_addr = client.getsockname()
3583 client.close()
3584 t.join()
3585 remote.close()
3586 server.close()
3587 # Sanity checks.
3588 self.assertIsInstance(remote, ssl.SSLSocket)
3589 self.assertEqual(peer, client_addr)
3590
3591 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003592 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003593 with context.wrap_socket(socket.socket()) as sock:
3594 with self.assertRaises(OSError) as cm:
3595 sock.getpeercert()
3596 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3597
3598 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003599 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003600 with context.wrap_socket(socket.socket()) as sock:
3601 with self.assertRaises(OSError) as cm:
3602 sock.do_handshake()
3603 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3604
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003605 def test_no_shared_ciphers(self):
3606 client_context, server_context, hostname = testing_context()
3607 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3608 client_context.options |= ssl.OP_NO_TLSv1_3
3609 # Force different suites on client and master
3610 client_context.set_ciphers("AES128")
3611 server_context.set_ciphers("AES256")
3612 with ThreadedEchoServer(context=server_context) as server:
3613 with client_context.wrap_socket(socket.socket(),
3614 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003615 with self.assertRaises(OSError):
3616 s.connect((HOST, server.port))
3617 self.assertIn("no shared cipher", server.conn_errors[0])
3618
3619 def test_version_basic(self):
3620 """
3621 Basic tests for SSLSocket.version().
3622 More tests are done in the test_protocol_*() methods.
3623 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003624 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3625 context.check_hostname = False
3626 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003627 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003628 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003629 chatty=False) as server:
3630 with context.wrap_socket(socket.socket()) as s:
3631 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003632 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003633 s.connect((HOST, server.port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003634 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003635 self.assertEqual(s.version(), 'TLSv1.3')
3636 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003637 self.assertEqual(s.version(), 'TLSv1.2')
3638 else: # 0.9.8 to 1.0.1
3639 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003640 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003641 self.assertIs(s.version(), None)
3642
Christian Heimescb5b68a2017-09-07 18:07:00 -07003643 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3644 "test requires TLSv1.3 enabled OpenSSL")
3645 def test_tls1_3(self):
3646 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3647 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003648 context.options |= (
3649 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3650 )
3651 with ThreadedEchoServer(context=context) as server:
3652 with context.wrap_socket(socket.socket()) as s:
3653 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003654 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003655 'TLS_AES_256_GCM_SHA384',
3656 'TLS_CHACHA20_POLY1305_SHA256',
3657 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003658 })
3659 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003660
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003661 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3662 "required OpenSSL 1.1.0g")
3663 def test_min_max_version(self):
3664 client_context, server_context, hostname = testing_context()
3665 # client TLSv1.0 to 1.2
3666 client_context.minimum_version = ssl.TLSVersion.TLSv1
3667 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3668 # server only TLSv1.2
3669 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3670 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3671
3672 with ThreadedEchoServer(context=server_context) as server:
3673 with client_context.wrap_socket(socket.socket(),
3674 server_hostname=hostname) as s:
3675 s.connect((HOST, server.port))
3676 self.assertEqual(s.version(), 'TLSv1.2')
3677
3678 # client 1.0 to 1.2, server 1.0 to 1.1
3679 server_context.minimum_version = ssl.TLSVersion.TLSv1
3680 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3681
3682 with ThreadedEchoServer(context=server_context) as server:
3683 with client_context.wrap_socket(socket.socket(),
3684 server_hostname=hostname) as s:
3685 s.connect((HOST, server.port))
3686 self.assertEqual(s.version(), 'TLSv1.1')
3687
3688 # client 1.0, server 1.2 (mismatch)
3689 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3690 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3691 client_context.minimum_version = ssl.TLSVersion.TLSv1
3692 client_context.maximum_version = ssl.TLSVersion.TLSv1
3693 with ThreadedEchoServer(context=server_context) as server:
3694 with client_context.wrap_socket(socket.socket(),
3695 server_hostname=hostname) as s:
3696 with self.assertRaises(ssl.SSLError) as e:
3697 s.connect((HOST, server.port))
3698 self.assertIn("alert", str(e.exception))
3699
3700
3701 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3702 "required OpenSSL 1.1.0g")
3703 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3704 def test_min_max_version_sslv3(self):
3705 client_context, server_context, hostname = testing_context()
3706 server_context.minimum_version = ssl.TLSVersion.SSLv3
3707 client_context.minimum_version = ssl.TLSVersion.SSLv3
3708 client_context.maximum_version = ssl.TLSVersion.SSLv3
3709 with ThreadedEchoServer(context=server_context) as server:
3710 with client_context.wrap_socket(socket.socket(),
3711 server_hostname=hostname) as s:
3712 s.connect((HOST, server.port))
3713 self.assertEqual(s.version(), 'SSLv3')
3714
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003715 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3716 def test_default_ecdh_curve(self):
3717 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3718 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003719 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003720 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003721 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3722 # cipher name.
3723 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003724 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3725 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3726 # our default cipher list should prefer ECDH-based ciphers
3727 # automatically.
3728 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3729 context.set_ciphers("ECCdraft:ECDH")
3730 with ThreadedEchoServer(context=context) as server:
3731 with context.wrap_socket(socket.socket()) as s:
3732 s.connect((HOST, server.port))
3733 self.assertIn("ECDH", s.cipher()[0])
3734
3735 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3736 "'tls-unique' channel binding not available")
3737 def test_tls_unique_channel_binding(self):
3738 """Test tls-unique channel binding."""
3739 if support.verbose:
3740 sys.stdout.write("\n")
3741
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003742 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003743
3744 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003745 chatty=True,
3746 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003747
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003748 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003749 with client_context.wrap_socket(
3750 socket.socket(),
3751 server_hostname=hostname) as s:
3752 s.connect((HOST, server.port))
3753 # get the data
3754 cb_data = s.get_channel_binding("tls-unique")
3755 if support.verbose:
3756 sys.stdout.write(
3757 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003758
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003759 # check if it is sane
3760 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003761 if s.version() == 'TLSv1.3':
3762 self.assertEqual(len(cb_data), 48)
3763 else:
3764 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003765
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003766 # and compare with the peers version
3767 s.write(b"CB tls-unique\n")
3768 peer_data_repr = s.read().strip()
3769 self.assertEqual(peer_data_repr,
3770 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003771
3772 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003773 with client_context.wrap_socket(
3774 socket.socket(),
3775 server_hostname=hostname) as s:
3776 s.connect((HOST, server.port))
3777 new_cb_data = s.get_channel_binding("tls-unique")
3778 if support.verbose:
3779 sys.stdout.write(
3780 "got another channel binding data: {0!r}\n".format(
3781 new_cb_data)
3782 )
3783 # is it really unique
3784 self.assertNotEqual(cb_data, new_cb_data)
3785 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003786 if s.version() == 'TLSv1.3':
3787 self.assertEqual(len(cb_data), 48)
3788 else:
3789 self.assertEqual(len(cb_data), 12) # True for TLSv1
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003790 s.write(b"CB tls-unique\n")
3791 peer_data_repr = s.read().strip()
3792 self.assertEqual(peer_data_repr,
3793 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003794
3795 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003796 client_context, server_context, hostname = testing_context()
3797 stats = server_params_test(client_context, server_context,
3798 chatty=True, connectionchatty=True,
3799 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003800 if support.verbose:
3801 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3802 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3803
3804 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3805 "ssl.OP_NO_COMPRESSION needed for this test")
3806 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003807 client_context, server_context, hostname = testing_context()
3808 client_context.options |= ssl.OP_NO_COMPRESSION
3809 server_context.options |= ssl.OP_NO_COMPRESSION
3810 stats = server_params_test(client_context, server_context,
3811 chatty=True, connectionchatty=True,
3812 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003813 self.assertIs(stats['compression'], None)
3814
3815 def test_dh_params(self):
3816 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003817 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003818 # test scenario needs TLS <= 1.2
3819 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003820 server_context.load_dh_params(DHFILE)
3821 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003822 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003823 stats = server_params_test(client_context, server_context,
3824 chatty=True, connectionchatty=True,
3825 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003826 cipher = stats["cipher"][0]
3827 parts = cipher.split("-")
3828 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3829 self.fail("Non-DH cipher: " + cipher[0])
3830
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003831 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003832 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003833 def test_ecdh_curve(self):
3834 # server secp384r1, client auto
3835 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003836
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003837 server_context.set_ecdh_curve("secp384r1")
3838 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3839 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3840 stats = server_params_test(client_context, server_context,
3841 chatty=True, connectionchatty=True,
3842 sni_name=hostname)
3843
3844 # server auto, client secp384r1
3845 client_context, server_context, hostname = testing_context()
3846 client_context.set_ecdh_curve("secp384r1")
3847 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3848 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3849 stats = server_params_test(client_context, server_context,
3850 chatty=True, connectionchatty=True,
3851 sni_name=hostname)
3852
3853 # server / client curve mismatch
3854 client_context, server_context, hostname = testing_context()
3855 client_context.set_ecdh_curve("prime256v1")
3856 server_context.set_ecdh_curve("secp384r1")
3857 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3858 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3859 try:
3860 stats = server_params_test(client_context, server_context,
3861 chatty=True, connectionchatty=True,
3862 sni_name=hostname)
3863 except ssl.SSLError:
3864 pass
3865 else:
3866 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003867 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003868 self.fail("mismatch curve did not fail")
3869
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003870 def test_selected_alpn_protocol(self):
3871 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003872 client_context, server_context, hostname = testing_context()
3873 stats = server_params_test(client_context, server_context,
3874 chatty=True, connectionchatty=True,
3875 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003876 self.assertIs(stats['client_alpn_protocol'], None)
3877
3878 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3879 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3880 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003881 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003882 server_context.set_alpn_protocols(['foo', 'bar'])
3883 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003884 chatty=True, connectionchatty=True,
3885 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003886 self.assertIs(stats['client_alpn_protocol'], None)
3887
3888 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3889 def test_alpn_protocols(self):
3890 server_protocols = ['foo', 'bar', 'milkshake']
3891 protocol_tests = [
3892 (['foo', 'bar'], 'foo'),
3893 (['bar', 'foo'], 'foo'),
3894 (['milkshake'], 'milkshake'),
3895 (['http/3.0', 'http/4.0'], None)
3896 ]
3897 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003898 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003899 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003900 client_context.set_alpn_protocols(client_protocols)
3901
3902 try:
3903 stats = server_params_test(client_context,
3904 server_context,
3905 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003906 connectionchatty=True,
3907 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003908 except ssl.SSLError as e:
3909 stats = e
3910
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003911 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003912 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3913 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3914 self.assertIsInstance(stats, ssl.SSLError)
3915 else:
3916 msg = "failed trying %s (s) and %s (c).\n" \
3917 "was expecting %s, but got %%s from the %%s" \
3918 % (str(server_protocols), str(client_protocols),
3919 str(expected))
3920 client_result = stats['client_alpn_protocol']
3921 self.assertEqual(client_result, expected,
3922 msg % (client_result, "client"))
3923 server_result = stats['server_alpn_protocols'][-1] \
3924 if len(stats['server_alpn_protocols']) else 'nothing'
3925 self.assertEqual(server_result, expected,
3926 msg % (server_result, "server"))
3927
3928 def test_selected_npn_protocol(self):
3929 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003930 client_context, server_context, hostname = testing_context()
3931 stats = server_params_test(client_context, server_context,
3932 chatty=True, connectionchatty=True,
3933 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003934 self.assertIs(stats['client_npn_protocol'], None)
3935
3936 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3937 def test_npn_protocols(self):
3938 server_protocols = ['http/1.1', 'spdy/2']
3939 protocol_tests = [
3940 (['http/1.1', 'spdy/2'], 'http/1.1'),
3941 (['spdy/2', 'http/1.1'], 'http/1.1'),
3942 (['spdy/2', 'test'], 'spdy/2'),
3943 (['abc', 'def'], 'abc')
3944 ]
3945 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003946 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003947 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003948 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003949 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003950 chatty=True, connectionchatty=True,
3951 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003952 msg = "failed trying %s (s) and %s (c).\n" \
3953 "was expecting %s, but got %%s from the %%s" \
3954 % (str(server_protocols), str(client_protocols),
3955 str(expected))
3956 client_result = stats['client_npn_protocol']
3957 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3958 server_result = stats['server_npn_protocols'][-1] \
3959 if len(stats['server_npn_protocols']) else 'nothing'
3960 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3961
3962 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003963 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003964 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003965 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003966 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003967 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003968 client_context.load_verify_locations(SIGNING_CA)
3969 return server_context, other_context, client_context
3970
3971 def check_common_name(self, stats, name):
3972 cert = stats['peercert']
3973 self.assertIn((('commonName', name),), cert['subject'])
3974
3975 @needs_sni
3976 def test_sni_callback(self):
3977 calls = []
3978 server_context, other_context, client_context = self.sni_contexts()
3979
Christian Heimesa170fa12017-09-15 20:27:30 +02003980 client_context.check_hostname = False
3981
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003982 def servername_cb(ssl_sock, server_name, initial_context):
3983 calls.append((server_name, initial_context))
3984 if server_name is not None:
3985 ssl_sock.context = other_context
3986 server_context.set_servername_callback(servername_cb)
3987
3988 stats = server_params_test(client_context, server_context,
3989 chatty=True,
3990 sni_name='supermessage')
3991 # The hostname was fetched properly, and the certificate was
3992 # changed for the connection.
3993 self.assertEqual(calls, [("supermessage", server_context)])
3994 # CERTFILE4 was selected
3995 self.check_common_name(stats, 'fakehostname')
3996
3997 calls = []
3998 # The callback is called with server_name=None
3999 stats = server_params_test(client_context, server_context,
4000 chatty=True,
4001 sni_name=None)
4002 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004003 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004004
4005 # Check disabling the callback
4006 calls = []
4007 server_context.set_servername_callback(None)
4008
4009 stats = server_params_test(client_context, server_context,
4010 chatty=True,
4011 sni_name='notfunny')
4012 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004013 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004014 self.assertEqual(calls, [])
4015
4016 @needs_sni
4017 def test_sni_callback_alert(self):
4018 # Returning a TLS alert is reflected to the connecting client
4019 server_context, other_context, client_context = self.sni_contexts()
4020
4021 def cb_returning_alert(ssl_sock, server_name, initial_context):
4022 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4023 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004024 with self.assertRaises(ssl.SSLError) as cm:
4025 stats = server_params_test(client_context, server_context,
4026 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004027 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004028 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004029
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004030 @needs_sni
4031 def test_sni_callback_raising(self):
4032 # Raising fails the connection with a TLS handshake failure alert.
4033 server_context, other_context, client_context = self.sni_contexts()
4034
4035 def cb_raising(ssl_sock, server_name, initial_context):
4036 1/0
4037 server_context.set_servername_callback(cb_raising)
4038
4039 with self.assertRaises(ssl.SSLError) as cm, \
4040 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004041 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004042 chatty=False,
4043 sni_name='supermessage')
4044 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4045 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004046
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004047 @needs_sni
4048 def test_sni_callback_wrong_return_type(self):
4049 # Returning the wrong return type terminates the TLS connection
4050 # with an internal error alert.
4051 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004052
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004053 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4054 return "foo"
4055 server_context.set_servername_callback(cb_wrong_return_type)
4056
4057 with self.assertRaises(ssl.SSLError) as cm, \
4058 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004059 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004060 chatty=False,
4061 sni_name='supermessage')
4062 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4063 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004064
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004065 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004066 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004067 client_context.set_ciphers("AES128:AES256")
4068 server_context.set_ciphers("AES256")
4069 expected_algs = [
4070 "AES256", "AES-256",
4071 # TLS 1.3 ciphers are always enabled
4072 "TLS_CHACHA20", "TLS_AES",
4073 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004074
Christian Heimesa170fa12017-09-15 20:27:30 +02004075 stats = server_params_test(client_context, server_context,
4076 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004077 ciphers = stats['server_shared_ciphers'][0]
4078 self.assertGreater(len(ciphers), 0)
4079 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004080 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004081 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004082
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004083 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004084 client_context, server_context, hostname = testing_context()
4085 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004086
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004087 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004088 s = client_context.wrap_socket(socket.socket(),
4089 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004090 s.connect((HOST, server.port))
4091 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004092
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 self.assertRaises(ValueError, s.read, 1024)
4094 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 def test_sendfile(self):
4097 TEST_DATA = b"x" * 512
4098 with open(support.TESTFN, 'wb') as f:
4099 f.write(TEST_DATA)
4100 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004101 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004102 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004103 context.load_verify_locations(SIGNING_CA)
4104 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004105 server = ThreadedEchoServer(context=context, chatty=False)
4106 with server:
4107 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004108 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004109 with open(support.TESTFN, 'rb') as file:
4110 s.sendfile(file)
4111 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004112
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004113 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004114 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004115 # TODO: sessions aren't compatible with TLSv1.3 yet
4116 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004117
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004118 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004119 stats = server_params_test(client_context, server_context,
4120 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004121 session = stats['session']
4122 self.assertTrue(session.id)
4123 self.assertGreater(session.time, 0)
4124 self.assertGreater(session.timeout, 0)
4125 self.assertTrue(session.has_ticket)
4126 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4127 self.assertGreater(session.ticket_lifetime_hint, 0)
4128 self.assertFalse(stats['session_reused'])
4129 sess_stat = server_context.session_stats()
4130 self.assertEqual(sess_stat['accept'], 1)
4131 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004132
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004133 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004134 stats = server_params_test(client_context, server_context,
4135 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004136 sess_stat = server_context.session_stats()
4137 self.assertEqual(sess_stat['accept'], 2)
4138 self.assertEqual(sess_stat['hits'], 1)
4139 self.assertTrue(stats['session_reused'])
4140 session2 = stats['session']
4141 self.assertEqual(session2.id, session.id)
4142 self.assertEqual(session2, session)
4143 self.assertIsNot(session2, session)
4144 self.assertGreaterEqual(session2.time, session.time)
4145 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004146
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004147 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004148 stats = server_params_test(client_context, server_context,
4149 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004150 self.assertFalse(stats['session_reused'])
4151 session3 = stats['session']
4152 self.assertNotEqual(session3.id, session.id)
4153 self.assertNotEqual(session3, session)
4154 sess_stat = server_context.session_stats()
4155 self.assertEqual(sess_stat['accept'], 3)
4156 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004157
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004158 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004159 stats = server_params_test(client_context, server_context,
4160 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004161 self.assertTrue(stats['session_reused'])
4162 session4 = stats['session']
4163 self.assertEqual(session4.id, session.id)
4164 self.assertEqual(session4, session)
4165 self.assertGreaterEqual(session4.time, session.time)
4166 self.assertGreaterEqual(session4.timeout, session.timeout)
4167 sess_stat = server_context.session_stats()
4168 self.assertEqual(sess_stat['accept'], 4)
4169 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004170
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004171 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004172 client_context, server_context, hostname = testing_context()
4173 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004174
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004175 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004176 client_context.options |= ssl.OP_NO_TLSv1_3
4177 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004178
Christian Heimesa170fa12017-09-15 20:27:30 +02004179 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004180 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004181 with client_context.wrap_socket(socket.socket(),
4182 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004183 # session is None before handshake
4184 self.assertEqual(s.session, None)
4185 self.assertEqual(s.session_reused, None)
4186 s.connect((HOST, server.port))
4187 session = s.session
4188 self.assertTrue(session)
4189 with self.assertRaises(TypeError) as e:
4190 s.session = object
Miss Islington (bot)42198572018-06-11 17:58:06 -07004191 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004192
Christian Heimesa170fa12017-09-15 20:27:30 +02004193 with client_context.wrap_socket(socket.socket(),
4194 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004195 s.connect((HOST, server.port))
4196 # cannot set session after handshake
4197 with self.assertRaises(ValueError) as e:
4198 s.session = session
4199 self.assertEqual(str(e.exception),
4200 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004201
Christian Heimesa170fa12017-09-15 20:27:30 +02004202 with client_context.wrap_socket(socket.socket(),
4203 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004204 # can set session before handshake and before the
4205 # connection was established
4206 s.session = session
4207 s.connect((HOST, server.port))
4208 self.assertEqual(s.session.id, session.id)
4209 self.assertEqual(s.session, session)
4210 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004211
Christian Heimesa170fa12017-09-15 20:27:30 +02004212 with client_context2.wrap_socket(socket.socket(),
4213 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004214 # cannot re-use session with a different SSLContext
4215 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004216 s.session = session
4217 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004218 self.assertEqual(str(e.exception),
4219 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004220
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004221
Christian Heimes2756ef32018-09-23 09:22:52 +02004222@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4223class TestPostHandshakeAuth(unittest.TestCase):
4224 def test_pha_setter(self):
4225 protocols = [
4226 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4227 ]
4228 for protocol in protocols:
4229 ctx = ssl.SSLContext(protocol)
4230 self.assertEqual(ctx.post_handshake_auth, False)
4231
4232 ctx.post_handshake_auth = True
4233 self.assertEqual(ctx.post_handshake_auth, True)
4234
4235 ctx.verify_mode = ssl.CERT_REQUIRED
4236 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4237 self.assertEqual(ctx.post_handshake_auth, True)
4238
4239 ctx.post_handshake_auth = False
4240 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4241 self.assertEqual(ctx.post_handshake_auth, False)
4242
4243 ctx.verify_mode = ssl.CERT_OPTIONAL
4244 ctx.post_handshake_auth = True
4245 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4246 self.assertEqual(ctx.post_handshake_auth, True)
4247
4248 def test_pha_required(self):
4249 client_context, server_context, hostname = testing_context()
4250 server_context.post_handshake_auth = True
4251 server_context.verify_mode = ssl.CERT_REQUIRED
4252 client_context.post_handshake_auth = True
4253 client_context.load_cert_chain(SIGNED_CERTFILE)
4254
4255 server = ThreadedEchoServer(context=server_context, chatty=False)
4256 with server:
4257 with client_context.wrap_socket(socket.socket(),
4258 server_hostname=hostname) as s:
4259 s.connect((HOST, server.port))
4260 s.write(b'HASCERT')
4261 self.assertEqual(s.recv(1024), b'FALSE\n')
4262 s.write(b'PHA')
4263 self.assertEqual(s.recv(1024), b'OK\n')
4264 s.write(b'HASCERT')
4265 self.assertEqual(s.recv(1024), b'TRUE\n')
4266 # PHA method just returns true when cert is already available
4267 s.write(b'PHA')
4268 self.assertEqual(s.recv(1024), b'OK\n')
4269 s.write(b'GETCERT')
4270 cert_text = s.recv(4096).decode('us-ascii')
4271 self.assertIn('Python Software Foundation CA', cert_text)
4272
4273 def test_pha_required_nocert(self):
4274 client_context, server_context, hostname = testing_context()
4275 server_context.post_handshake_auth = True
4276 server_context.verify_mode = ssl.CERT_REQUIRED
4277 client_context.post_handshake_auth = True
4278
4279 server = ThreadedEchoServer(context=server_context, chatty=False)
4280 with server:
4281 with client_context.wrap_socket(socket.socket(),
4282 server_hostname=hostname) as s:
4283 s.connect((HOST, server.port))
4284 s.write(b'PHA')
4285 # receive CertificateRequest
4286 self.assertEqual(s.recv(1024), b'OK\n')
4287 # send empty Certificate + Finish
4288 s.write(b'HASCERT')
4289 # receive alert
4290 with self.assertRaisesRegex(
4291 ssl.SSLError,
4292 'tlsv13 alert certificate required'):
4293 s.recv(1024)
4294
4295 def test_pha_optional(self):
4296 if support.verbose:
4297 sys.stdout.write("\n")
4298
4299 client_context, server_context, hostname = testing_context()
4300 server_context.post_handshake_auth = True
4301 server_context.verify_mode = ssl.CERT_REQUIRED
4302 client_context.post_handshake_auth = True
4303 client_context.load_cert_chain(SIGNED_CERTFILE)
4304
4305 # check CERT_OPTIONAL
4306 server_context.verify_mode = ssl.CERT_OPTIONAL
4307 server = ThreadedEchoServer(context=server_context, chatty=False)
4308 with server:
4309 with client_context.wrap_socket(socket.socket(),
4310 server_hostname=hostname) as s:
4311 s.connect((HOST, server.port))
4312 s.write(b'HASCERT')
4313 self.assertEqual(s.recv(1024), b'FALSE\n')
4314 s.write(b'PHA')
4315 self.assertEqual(s.recv(1024), b'OK\n')
4316 s.write(b'HASCERT')
4317 self.assertEqual(s.recv(1024), b'TRUE\n')
4318
4319 def test_pha_optional_nocert(self):
4320 if support.verbose:
4321 sys.stdout.write("\n")
4322
4323 client_context, server_context, hostname = testing_context()
4324 server_context.post_handshake_auth = True
4325 server_context.verify_mode = ssl.CERT_OPTIONAL
4326 client_context.post_handshake_auth = True
4327
4328 server = ThreadedEchoServer(context=server_context, chatty=False)
4329 with server:
4330 with client_context.wrap_socket(socket.socket(),
4331 server_hostname=hostname) as s:
4332 s.connect((HOST, server.port))
4333 s.write(b'HASCERT')
4334 self.assertEqual(s.recv(1024), b'FALSE\n')
4335 s.write(b'PHA')
4336 self.assertEqual(s.recv(1024), b'OK\n')
4337 # optional doens't fail when client does not have a cert
4338 s.write(b'HASCERT')
4339 self.assertEqual(s.recv(1024), b'FALSE\n')
4340
4341 def test_pha_no_pha_client(self):
4342 client_context, server_context, hostname = testing_context()
4343 server_context.post_handshake_auth = True
4344 server_context.verify_mode = ssl.CERT_REQUIRED
4345 client_context.load_cert_chain(SIGNED_CERTFILE)
4346
4347 server = ThreadedEchoServer(context=server_context, chatty=False)
4348 with server:
4349 with client_context.wrap_socket(socket.socket(),
4350 server_hostname=hostname) as s:
4351 s.connect((HOST, server.port))
4352 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4353 s.verify_client_post_handshake()
4354 s.write(b'PHA')
4355 self.assertIn(b'extension not received', s.recv(1024))
4356
4357 def test_pha_no_pha_server(self):
4358 # server doesn't have PHA enabled, cert is requested in handshake
4359 client_context, server_context, hostname = testing_context()
4360 server_context.verify_mode = ssl.CERT_REQUIRED
4361 client_context.post_handshake_auth = True
4362 client_context.load_cert_chain(SIGNED_CERTFILE)
4363
4364 server = ThreadedEchoServer(context=server_context, chatty=False)
4365 with server:
4366 with client_context.wrap_socket(socket.socket(),
4367 server_hostname=hostname) as s:
4368 s.connect((HOST, server.port))
4369 s.write(b'HASCERT')
4370 self.assertEqual(s.recv(1024), b'TRUE\n')
4371 # PHA doesn't fail if there is already a cert
4372 s.write(b'PHA')
4373 self.assertEqual(s.recv(1024), b'OK\n')
4374 s.write(b'HASCERT')
4375 self.assertEqual(s.recv(1024), b'TRUE\n')
4376
4377 def test_pha_not_tls13(self):
4378 # TLS 1.2
4379 client_context, server_context, hostname = testing_context()
4380 server_context.verify_mode = ssl.CERT_REQUIRED
4381 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4382 client_context.post_handshake_auth = True
4383 client_context.load_cert_chain(SIGNED_CERTFILE)
4384
4385 server = ThreadedEchoServer(context=server_context, chatty=False)
4386 with server:
4387 with client_context.wrap_socket(socket.socket(),
4388 server_hostname=hostname) as s:
4389 s.connect((HOST, server.port))
4390 # PHA fails for TLS != 1.3
4391 s.write(b'PHA')
4392 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4393
4394
Thomas Woutersed03b412007-08-28 21:37:11 +00004395def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004396 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004397 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004398 plats = {
4399 'Linux': platform.linux_distribution,
4400 'Mac': platform.mac_ver,
4401 'Windows': platform.win32_ver,
4402 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004403 with warnings.catch_warnings():
4404 warnings.filterwarnings(
4405 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004406 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004407 'functions are deprecated .*',
Xtreakf5770f32018-07-05 22:29:46 +05304408 DeprecationWarning,
Berker Peksag9e7990a2015-05-16 23:21:26 +03004409 )
4410 for name, func in plats.items():
4411 plat = func()
4412 if plat and plat[0]:
4413 plat = '%s %r' % (name, plat)
4414 break
4415 else:
4416 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004417 print("test_ssl: testing with %r %r" %
4418 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4419 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004420 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004421 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4422 try:
4423 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4424 except AttributeError:
4425 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004426
Antoine Pitrou152efa22010-05-16 18:19:27 +00004427 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004428 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004429 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004430 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004431 BADCERT, BADKEY, EMPTYCERT]:
4432 if not os.path.exists(filename):
4433 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004434
Martin Panter3840b2a2016-03-27 01:53:46 +00004435 tests = [
4436 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004437 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes2756ef32018-09-23 09:22:52 +02004438 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004439 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004440
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004441 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004442 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004443
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004444 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004445 try:
4446 support.run_unittest(*tests)
4447 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004448 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004449
4450if __name__ == "__main__":
4451 test_main()