blob: 38ecddd07d98c04158cb7ed986036119eb495d69 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070064 'notAfter': 'Aug 26 14:23:15 2028 GMT',
65 'notBefore': 'Aug 29 14:23:15 2018 GMT',
66 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010067 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070089 'notAfter': 'Jul 7 14:23:16 2028 GMT',
90 'notBefore': 'Aug 29 14:23:16 2018 GMT',
91 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010092 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Miss Islington (bot)be5de952019-01-15 15:03:36 -0800119TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Miss Islington (bot)e3228a32018-08-14 10:52:27 -0400121DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
185def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
187 @functools.wraps(func)
188 def f(*args, **kwargs):
189 try:
190 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
191 except ssl.SSLError:
192 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
193 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
194 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
195 return func(*args, **kwargs)
196 return f
197 else:
198 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100200needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
201
Antoine Pitrou23df4832010-08-04 17:14:06 +0000202
Christian Heimesd0486372016-09-10 23:23:33 +0200203def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
204 cert_reqs=ssl.CERT_NONE, ca_certs=None,
205 ciphers=None, certfile=None, keyfile=None,
206 **kwargs):
207 context = ssl.SSLContext(ssl_version)
208 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200209 if cert_reqs == ssl.CERT_NONE:
210 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200211 context.verify_mode = cert_reqs
212 if ca_certs is not None:
213 context.load_verify_locations(ca_certs)
214 if certfile is not None or keyfile is not None:
215 context.load_cert_chain(certfile, keyfile)
216 if ciphers is not None:
217 context.set_ciphers(ciphers)
218 return context.wrap_socket(sock, **kwargs)
219
Christian Heimesa170fa12017-09-15 20:27:30 +0200220
221def testing_context(server_cert=SIGNED_CERTFILE):
222 """Create context
223
224 client_context, server_context, hostname = testing_context()
225 """
226 if server_cert == SIGNED_CERTFILE:
227 hostname = SIGNED_CERTFILE_HOSTNAME
228 elif server_cert == SIGNED_CERTFILE2:
229 hostname = SIGNED_CERTFILE2_HOSTNAME
230 else:
231 raise ValueError(server_cert)
232
233 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
234 client_context.load_verify_locations(SIGNING_CA)
235
236 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
237 server_context.load_cert_chain(server_cert)
Christian Heimes2756ef32018-09-23 09:22:52 +0200238 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200239
240 return client_context, server_context, hostname
241
242
Antoine Pitrou152efa22010-05-16 18:19:27 +0000243class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000244
Antoine Pitrou480a1242010-04-28 21:37:09 +0000245 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000246 ssl.CERT_NONE
247 ssl.CERT_OPTIONAL
248 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100249 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100250 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100251 if ssl.HAS_ECDH:
252 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100253 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
254 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000255 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100256 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700257 ssl.OP_NO_SSLv2
258 ssl.OP_NO_SSLv3
259 ssl.OP_NO_TLSv1
260 ssl.OP_NO_TLSv1_3
261 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
262 ssl.OP_NO_TLSv1_1
263 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200264 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000265
Christian Heimes89c20512018-02-27 11:17:32 +0100266 def test_private_init(self):
267 with self.assertRaisesRegex(TypeError, "public constructor"):
268 with socket.socket() as s:
269 ssl.SSLSocket(s)
270
Antoine Pitrou172f0252014-04-18 20:33:08 +0200271 def test_str_for_enums(self):
272 # Make sure that the PROTOCOL_* constants have enum-like string
273 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200274 proto = ssl.PROTOCOL_TLS
275 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200276 ctx = ssl.SSLContext(proto)
277 self.assertIs(ctx.protocol, proto)
278
Antoine Pitrou480a1242010-04-28 21:37:09 +0000279 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000282 sys.stdout.write("\n RAND_status is %d (%s)\n"
283 % (v, (v and "sufficient randomness") or
284 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200285
286 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
287 self.assertEqual(len(data), 16)
288 self.assertEqual(is_cryptographic, v == 1)
289 if v:
290 data = ssl.RAND_bytes(16)
291 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200292 else:
293 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200294
Victor Stinner1e81a392013-12-19 16:47:04 +0100295 # negative num is invalid
296 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
297 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
298
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100299 if hasattr(ssl, 'RAND_egd'):
300 self.assertRaises(TypeError, ssl.RAND_egd, 1)
301 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000302 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200303 ssl.RAND_add(b"this is a random bytes object", 75.0)
304 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000305
Christian Heimesf77b4b22013-08-21 13:26:05 +0200306 @unittest.skipUnless(os.name == 'posix', 'requires posix')
307 def test_random_fork(self):
308 status = ssl.RAND_status()
309 if not status:
310 self.fail("OpenSSL's PRNG has insufficient randomness")
311
312 rfd, wfd = os.pipe()
313 pid = os.fork()
314 if pid == 0:
315 try:
316 os.close(rfd)
317 child_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(child_random), 16)
319 os.write(wfd, child_random)
320 os.close(wfd)
321 except BaseException:
322 os._exit(1)
323 else:
324 os._exit(0)
325 else:
326 os.close(wfd)
327 self.addCleanup(os.close, rfd)
328 _, status = os.waitpid(pid, 0)
329 self.assertEqual(status, 0)
330
331 child_random = os.read(rfd, 16)
332 self.assertEqual(len(child_random), 16)
333 parent_random = ssl.RAND_pseudo_bytes(16)[0]
334 self.assertEqual(len(parent_random), 16)
335
336 self.assertNotEqual(child_random, parent_random)
337
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -0700338 maxDiff = None
339
Antoine Pitrou480a1242010-04-28 21:37:09 +0000340 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000341 # note that this uses an 'unofficial' function in _ssl.c,
342 # provided solely for this test, to exercise the certificate
343 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100344 self.assertEqual(
345 ssl._ssl._test_decode_cert(CERTFILE),
346 CERTFILE_INFO
347 )
348 self.assertEqual(
349 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
350 SIGNED_CERTFILE_INFO
351 )
352
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200353 # Issue #13034: the subjectAltName in some certificates
354 # (notably projects.developer.nokia.com:443) wasn't parsed
355 p = ssl._ssl._test_decode_cert(NOKIACERT)
356 if support.verbose:
357 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
358 self.assertEqual(p['subjectAltName'],
359 (('DNS', 'projects.developer.nokia.com'),
360 ('DNS', 'projects.forum.nokia.com'))
361 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100362 # extra OCSP and AIA fields
363 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
364 self.assertEqual(p['caIssuers'],
365 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
366 self.assertEqual(p['crlDistributionPoints'],
367 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000368
Miss Islington (bot)be5de952019-01-15 15:03:36 -0800369 def test_parse_cert_CVE_2019_5010(self):
370 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
371 if support.verbose:
372 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
373 self.assertEqual(
374 p,
375 {
376 'issuer': (
377 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
378 'notAfter': 'Jun 14 18:00:58 2028 GMT',
379 'notBefore': 'Jun 18 18:00:58 2018 GMT',
380 'serialNumber': '02',
381 'subject': ((('countryName', 'UK'),),
382 (('commonName',
383 'codenomicon-vm-2.test.lal.cisco.com'),)),
384 'subjectAltName': (
385 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
386 'version': 3
387 }
388 )
389
Christian Heimes824f7f32013-08-17 00:54:47 +0200390 def test_parse_cert_CVE_2013_4238(self):
391 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
392 if support.verbose:
393 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
394 subject = ((('countryName', 'US'),),
395 (('stateOrProvinceName', 'Oregon'),),
396 (('localityName', 'Beaverton'),),
397 (('organizationName', 'Python Software Foundation'),),
398 (('organizationalUnitName', 'Python Core Development'),),
399 (('commonName', 'null.python.org\x00example.org'),),
400 (('emailAddress', 'python-dev@python.org'),))
401 self.assertEqual(p['subject'], subject)
402 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200403 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
404 san = (('DNS', 'altnull.python.org\x00example.com'),
405 ('email', 'null@python.org\x00user@example.org'),
406 ('URI', 'http://null.python.org\x00http://example.org'),
407 ('IP Address', '192.0.2.1'),
408 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
409 else:
410 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
411 san = (('DNS', 'altnull.python.org\x00example.com'),
412 ('email', 'null@python.org\x00user@example.org'),
413 ('URI', 'http://null.python.org\x00http://example.org'),
414 ('IP Address', '192.0.2.1'),
415 ('IP Address', '<invalid>'))
416
417 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200418
Christian Heimes1c03abd2016-09-06 23:25:35 +0200419 def test_parse_all_sans(self):
420 p = ssl._ssl._test_decode_cert(ALLSANFILE)
421 self.assertEqual(p['subjectAltName'],
422 (
423 ('DNS', 'allsans'),
424 ('othername', '<unsupported>'),
425 ('othername', '<unsupported>'),
426 ('email', 'user@example.org'),
427 ('DNS', 'www.example.org'),
428 ('DirName',
429 ((('countryName', 'XY'),),
430 (('localityName', 'Castle Anthrax'),),
431 (('organizationName', 'Python Software Foundation'),),
432 (('commonName', 'dirname example'),))),
433 ('URI', 'https://www.python.org/'),
434 ('IP Address', '127.0.0.1'),
435 ('IP Address', '0:0:0:0:0:0:0:1\n'),
436 ('Registered ID', '1.2.3.4.5')
437 )
438 )
439
Antoine Pitrou480a1242010-04-28 21:37:09 +0000440 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000441 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000442 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000443 d1 = ssl.PEM_cert_to_DER_cert(pem)
444 p2 = ssl.DER_cert_to_PEM_cert(d1)
445 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000446 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000447 if not p2.startswith(ssl.PEM_HEADER + '\n'):
448 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
449 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
450 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000451
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000452 def test_openssl_version(self):
453 n = ssl.OPENSSL_VERSION_NUMBER
454 t = ssl.OPENSSL_VERSION_INFO
455 s = ssl.OPENSSL_VERSION
456 self.assertIsInstance(n, int)
457 self.assertIsInstance(t, tuple)
458 self.assertIsInstance(s, str)
459 # Some sanity checks follow
460 # >= 0.9
461 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400462 # < 3.0
463 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000464 major, minor, fix, patch, status = t
465 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400466 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000467 self.assertGreaterEqual(minor, 0)
468 self.assertLess(minor, 256)
469 self.assertGreaterEqual(fix, 0)
470 self.assertLess(fix, 256)
471 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100472 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000473 self.assertGreaterEqual(status, 0)
474 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400475 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200476 if IS_LIBRESSL:
477 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100478 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400479 else:
480 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100481 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000482
Antoine Pitrou9d543662010-04-23 23:10:32 +0000483 @support.cpython_only
484 def test_refcycle(self):
485 # Issue #7943: an SSL object doesn't create reference cycles with
486 # itself.
487 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200488 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000489 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100490 with support.check_warnings(("", ResourceWarning)):
491 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100492 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000493
Antoine Pitroua468adc2010-09-14 14:43:44 +0000494 def test_wrapped_unconnected(self):
495 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200496 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000497 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200498 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100499 self.assertRaises(OSError, ss.recv, 1)
500 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
501 self.assertRaises(OSError, ss.recvfrom, 1)
502 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
503 self.assertRaises(OSError, ss.send, b'x')
504 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)6485aa62018-12-06 12:52:43 -0800505 self.assertRaises(NotImplementedError, ss.dup)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800506 self.assertRaises(NotImplementedError, ss.sendmsg,
507 [b'x'], (), 0, ('0.0.0.0', 0))
Miss Islington (bot)6485aa62018-12-06 12:52:43 -0800508 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
509 self.assertRaises(NotImplementedError, ss.recvmsg_into,
510 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000511
Antoine Pitrou40f08742010-04-24 22:04:40 +0000512 def test_timeout(self):
513 # Issue #8524: when creating an SSL socket, the timeout of the
514 # original socket should be retained.
515 for timeout in (None, 0.0, 5.0):
516 s = socket.socket(socket.AF_INET)
517 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200518 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100519 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000520
Christian Heimesd0486372016-09-10 23:23:33 +0200521 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000522 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000523 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000524 "certfile must be specified",
525 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000526 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000527 "certfile must be specified for server-side operations",
528 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000529 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000530 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200531 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100532 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
533 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200534 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200535 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000536 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000537 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000538 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200539 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000540 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000541 ssl.wrap_socket(sock,
542 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000543 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200544 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000545 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000546 ssl.wrap_socket(sock,
547 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000548 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000549
Martin Panter3464ea22016-02-01 21:58:11 +0000550 def bad_cert_test(self, certfile):
551 """Check that trying to use the given client certificate fails"""
552 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
553 certfile)
554 sock = socket.socket()
555 self.addCleanup(sock.close)
556 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200557 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200558 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000559
560 def test_empty_cert(self):
561 """Wrapping with an empty cert file"""
562 self.bad_cert_test("nullcert.pem")
563
564 def test_malformed_cert(self):
565 """Wrapping with a badly formatted certificate (syntax error)"""
566 self.bad_cert_test("badcert.pem")
567
568 def test_malformed_key(self):
569 """Wrapping with a badly formatted key (syntax error)"""
570 self.bad_cert_test("badkey.pem")
571
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000572 def test_match_hostname(self):
573 def ok(cert, hostname):
574 ssl.match_hostname(cert, hostname)
575 def fail(cert, hostname):
576 self.assertRaises(ssl.CertificateError,
577 ssl.match_hostname, cert, hostname)
578
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100579 # -- Hostname matching --
580
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000581 cert = {'subject': ((('commonName', 'example.com'),),)}
582 ok(cert, 'example.com')
583 ok(cert, 'ExAmple.cOm')
584 fail(cert, 'www.example.com')
585 fail(cert, '.example.com')
586 fail(cert, 'example.org')
587 fail(cert, 'exampleXcom')
588
589 cert = {'subject': ((('commonName', '*.a.com'),),)}
590 ok(cert, 'foo.a.com')
591 fail(cert, 'bar.foo.a.com')
592 fail(cert, 'a.com')
593 fail(cert, 'Xa.com')
594 fail(cert, '.a.com')
595
Mandeep Singhede2ac92017-11-27 04:01:27 +0530596 # only match wildcards when they are the only thing
597 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000598 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530599 fail(cert, 'foo.com')
600 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000601 fail(cert, 'bar.com')
602 fail(cert, 'foo.a.com')
603 fail(cert, 'bar.foo.com')
604
Christian Heimes824f7f32013-08-17 00:54:47 +0200605 # NULL bytes are bad, CVE-2013-4073
606 cert = {'subject': ((('commonName',
607 'null.python.org\x00example.org'),),)}
608 ok(cert, 'null.python.org\x00example.org') # or raise an error?
609 fail(cert, 'example.org')
610 fail(cert, 'null.python.org')
611
Georg Brandl72c98d32013-10-27 07:16:53 +0100612 # error cases with wildcards
613 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
614 fail(cert, 'bar.foo.a.com')
615 fail(cert, 'a.com')
616 fail(cert, 'Xa.com')
617 fail(cert, '.a.com')
618
619 cert = {'subject': ((('commonName', 'a.*.com'),),)}
620 fail(cert, 'a.foo.com')
621 fail(cert, 'a..com')
622 fail(cert, 'a.com')
623
624 # wildcard doesn't match IDNA prefix 'xn--'
625 idna = 'püthon.python.org'.encode("idna").decode("ascii")
626 cert = {'subject': ((('commonName', idna),),)}
627 ok(cert, idna)
628 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
629 fail(cert, idna)
630 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
631 fail(cert, idna)
632
633 # wildcard in first fragment and IDNA A-labels in sequent fragments
634 # are supported.
635 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
636 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530637 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
638 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100639 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
640 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
641
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000642 # Slightly fake real-world example
643 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
644 'subject': ((('commonName', 'linuxfrz.org'),),),
645 'subjectAltName': (('DNS', 'linuxfr.org'),
646 ('DNS', 'linuxfr.com'),
647 ('othername', '<unsupported>'))}
648 ok(cert, 'linuxfr.org')
649 ok(cert, 'linuxfr.com')
650 # Not a "DNS" entry
651 fail(cert, '<unsupported>')
652 # When there is a subjectAltName, commonName isn't used
653 fail(cert, 'linuxfrz.org')
654
655 # A pristine real-world example
656 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
657 'subject': ((('countryName', 'US'),),
658 (('stateOrProvinceName', 'California'),),
659 (('localityName', 'Mountain View'),),
660 (('organizationName', 'Google Inc'),),
661 (('commonName', 'mail.google.com'),))}
662 ok(cert, 'mail.google.com')
663 fail(cert, 'gmail.com')
664 # Only commonName is considered
665 fail(cert, 'California')
666
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100667 # -- IPv4 matching --
668 cert = {'subject': ((('commonName', 'example.com'),),),
669 'subjectAltName': (('DNS', 'example.com'),
670 ('IP Address', '10.11.12.13'),
671 ('IP Address', '14.15.16.17'))}
672 ok(cert, '10.11.12.13')
673 ok(cert, '14.15.16.17')
674 fail(cert, '14.15.16.18')
675 fail(cert, 'example.net')
676
677 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800678 if hasattr(socket, 'AF_INET6'):
679 cert = {'subject': ((('commonName', 'example.com'),),),
680 'subjectAltName': (
681 ('DNS', 'example.com'),
682 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
683 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
684 ok(cert, '2001::cafe')
685 ok(cert, '2003::baba')
686 fail(cert, '2003::bebe')
687 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100688
689 # -- Miscellaneous --
690
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000691 # Neither commonName nor subjectAltName
692 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
693 'subject': ((('countryName', 'US'),),
694 (('stateOrProvinceName', 'California'),),
695 (('localityName', 'Mountain View'),),
696 (('organizationName', 'Google Inc'),))}
697 fail(cert, 'mail.google.com')
698
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200699 # No DNS entry in subjectAltName but a commonName
700 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
701 'subject': ((('countryName', 'US'),),
702 (('stateOrProvinceName', 'California'),),
703 (('localityName', 'Mountain View'),),
704 (('commonName', 'mail.google.com'),)),
705 'subjectAltName': (('othername', 'blabla'), )}
706 ok(cert, 'mail.google.com')
707
708 # No DNS entry subjectAltName and no commonName
709 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
710 'subject': ((('countryName', 'US'),),
711 (('stateOrProvinceName', 'California'),),
712 (('localityName', 'Mountain View'),),
713 (('organizationName', 'Google Inc'),)),
714 'subjectAltName': (('othername', 'blabla'),)}
715 fail(cert, 'google.com')
716
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000717 # Empty cert / no cert
718 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
719 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
720
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200721 # Issue #17980: avoid denials of service by refusing more than one
722 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800723 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
724 with self.assertRaisesRegex(
725 ssl.CertificateError,
726 "partial wildcards in leftmost label are not supported"):
727 ssl.match_hostname(cert, 'axxb.example.com')
728
729 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
730 with self.assertRaisesRegex(
731 ssl.CertificateError,
732 "wildcard can only be present in the leftmost label"):
733 ssl.match_hostname(cert, 'www.sub.example.com')
734
735 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
736 with self.assertRaisesRegex(
737 ssl.CertificateError,
738 "too many wildcards"):
739 ssl.match_hostname(cert, 'axxbxxc.example.com')
740
741 cert = {'subject': ((('commonName', '*'),),)}
742 with self.assertRaisesRegex(
743 ssl.CertificateError,
744 "sole wildcard without additional labels are not support"):
745 ssl.match_hostname(cert, 'host')
746
747 cert = {'subject': ((('commonName', '*.com'),),)}
748 with self.assertRaisesRegex(
749 ssl.CertificateError,
750 r"hostname 'com' doesn't match '\*.com'"):
751 ssl.match_hostname(cert, 'com')
752
753 # extra checks for _inet_paton()
754 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
755 with self.assertRaises(ValueError):
756 ssl._inet_paton(invalid)
757 for ipaddr in ['127.0.0.1', '192.168.0.1']:
758 self.assertTrue(ssl._inet_paton(ipaddr))
759 if hasattr(socket, 'AF_INET6'):
760 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
761 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200762
Antoine Pitroud5323212010-10-22 18:19:07 +0000763 def test_server_side(self):
764 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200765 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000766 with socket.socket() as sock:
767 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
768 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000769
Antoine Pitroud6494802011-07-21 01:11:30 +0200770 def test_unknown_channel_binding(self):
771 # should raise ValueError for unknown type
772 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200773 s.bind(('127.0.0.1', 0))
774 s.listen()
775 c = socket.socket(socket.AF_INET)
776 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200777 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100778 with self.assertRaises(ValueError):
779 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200780 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200781
782 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
783 "'tls-unique' channel binding not available")
784 def test_tls_unique_channel_binding(self):
785 # unconnected should return None for known type
786 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200787 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100788 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200789 # the same for server-side
790 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200791 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100792 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200793
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600794 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200795 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600796 r = repr(ss)
797 with self.assertWarns(ResourceWarning) as cm:
798 ss = None
799 support.gc_collect()
800 self.assertIn(r, str(cm.warning.args[0]))
801
Christian Heimes6d7ad132013-06-09 18:02:55 +0200802 def test_get_default_verify_paths(self):
803 paths = ssl.get_default_verify_paths()
804 self.assertEqual(len(paths), 6)
805 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
806
807 with support.EnvironmentVarGuard() as env:
808 env["SSL_CERT_DIR"] = CAPATH
809 env["SSL_CERT_FILE"] = CERTFILE
810 paths = ssl.get_default_verify_paths()
811 self.assertEqual(paths.cafile, CERTFILE)
812 self.assertEqual(paths.capath, CAPATH)
813
Christian Heimes44109d72013-11-22 01:51:30 +0100814 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
815 def test_enum_certificates(self):
816 self.assertTrue(ssl.enum_certificates("CA"))
817 self.assertTrue(ssl.enum_certificates("ROOT"))
818
819 self.assertRaises(TypeError, ssl.enum_certificates)
820 self.assertRaises(WindowsError, ssl.enum_certificates, "")
821
Christian Heimesc2d65e12013-11-22 16:13:55 +0100822 trust_oids = set()
823 for storename in ("CA", "ROOT"):
824 store = ssl.enum_certificates(storename)
825 self.assertIsInstance(store, list)
826 for element in store:
827 self.assertIsInstance(element, tuple)
828 self.assertEqual(len(element), 3)
829 cert, enc, trust = element
830 self.assertIsInstance(cert, bytes)
831 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
832 self.assertIsInstance(trust, (set, bool))
833 if isinstance(trust, set):
834 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100835
836 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100837 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200838
Christian Heimes46bebee2013-06-09 19:03:31 +0200839 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100840 def test_enum_crls(self):
841 self.assertTrue(ssl.enum_crls("CA"))
842 self.assertRaises(TypeError, ssl.enum_crls)
843 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200844
Christian Heimes44109d72013-11-22 01:51:30 +0100845 crls = ssl.enum_crls("CA")
846 self.assertIsInstance(crls, list)
847 for element in crls:
848 self.assertIsInstance(element, tuple)
849 self.assertEqual(len(element), 2)
850 self.assertIsInstance(element[0], bytes)
851 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200852
Christian Heimes46bebee2013-06-09 19:03:31 +0200853
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100854 def test_asn1object(self):
855 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
856 '1.3.6.1.5.5.7.3.1')
857
858 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
859 self.assertEqual(val, expected)
860 self.assertEqual(val.nid, 129)
861 self.assertEqual(val.shortname, 'serverAuth')
862 self.assertEqual(val.longname, 'TLS Web Server Authentication')
863 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
864 self.assertIsInstance(val, ssl._ASN1Object)
865 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
866
867 val = ssl._ASN1Object.fromnid(129)
868 self.assertEqual(val, expected)
869 self.assertIsInstance(val, ssl._ASN1Object)
870 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100871 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
872 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100873 for i in range(1000):
874 try:
875 obj = ssl._ASN1Object.fromnid(i)
876 except ValueError:
877 pass
878 else:
879 self.assertIsInstance(obj.nid, int)
880 self.assertIsInstance(obj.shortname, str)
881 self.assertIsInstance(obj.longname, str)
882 self.assertIsInstance(obj.oid, (str, type(None)))
883
884 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
885 self.assertEqual(val, expected)
886 self.assertIsInstance(val, ssl._ASN1Object)
887 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
888 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
889 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100890 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
891 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100892
Christian Heimes72d28502013-11-23 13:56:58 +0100893 def test_purpose_enum(self):
894 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
895 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
896 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
897 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
898 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
899 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
900 '1.3.6.1.5.5.7.3.1')
901
902 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
903 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
904 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
905 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
906 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
907 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
908 '1.3.6.1.5.5.7.3.2')
909
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100910 def test_unsupported_dtls(self):
911 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
912 self.addCleanup(s.close)
913 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200914 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100915 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200916 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100917 with self.assertRaises(NotImplementedError) as cx:
918 ctx.wrap_socket(s)
919 self.assertEqual(str(cx.exception), "only stream sockets are supported")
920
Antoine Pitrouc695c952014-04-28 20:57:36 +0200921 def cert_time_ok(self, timestring, timestamp):
922 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
923
924 def cert_time_fail(self, timestring):
925 with self.assertRaises(ValueError):
926 ssl.cert_time_to_seconds(timestring)
927
928 @unittest.skipUnless(utc_offset(),
929 'local time needs to be different from UTC')
930 def test_cert_time_to_seconds_timezone(self):
931 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
932 # results if local timezone is not UTC
933 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
934 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
935
936 def test_cert_time_to_seconds(self):
937 timestring = "Jan 5 09:34:43 2018 GMT"
938 ts = 1515144883.0
939 self.cert_time_ok(timestring, ts)
940 # accept keyword parameter, assert its name
941 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
942 # accept both %e and %d (space or zero generated by strftime)
943 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
944 # case-insensitive
945 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
946 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
947 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
948 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
949 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
950 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
951 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
952 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
953
954 newyear_ts = 1230768000.0
955 # leap seconds
956 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
957 # same timestamp
958 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
959
960 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
961 # allow 60th second (even if it is not a leap second)
962 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
963 # allow 2nd leap second for compatibility with time.strptime()
964 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
965 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
966
Mike53f7a7c2017-12-14 14:04:53 +0300967 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200968 # 99991231235959Z (rfc 5280)
969 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
970
971 @support.run_with_locale('LC_ALL', '')
972 def test_cert_time_to_seconds_locale(self):
973 # `cert_time_to_seconds()` should be locale independent
974
975 def local_february_name():
976 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
977
978 if local_february_name().lower() == 'feb':
979 self.skipTest("locale-specific month name needs to be "
980 "different from C locale")
981
982 # locale-independent
983 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
984 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
985
Martin Panter3840b2a2016-03-27 01:53:46 +0000986 def test_connect_ex_error(self):
987 server = socket.socket(socket.AF_INET)
988 self.addCleanup(server.close)
989 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200990 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000991 cert_reqs=ssl.CERT_REQUIRED)
992 self.addCleanup(s.close)
993 rc = s.connect_ex((HOST, port))
994 # Issue #19919: Windows machines or VMs hosted on Windows
995 # machines sometimes return EWOULDBLOCK.
996 errors = (
997 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
998 errno.EWOULDBLOCK,
999 )
1000 self.assertIn(rc, errors)
1001
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001002
Antoine Pitrou152efa22010-05-16 18:19:27 +00001003class ContextTests(unittest.TestCase):
1004
Antoine Pitrou23df4832010-08-04 17:14:06 +00001005 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +00001006 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001007 for protocol in PROTOCOLS:
1008 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001009 ctx = ssl.SSLContext()
1010 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001011 self.assertRaises(ValueError, ssl.SSLContext, -1)
1012 self.assertRaises(ValueError, ssl.SSLContext, 42)
1013
Antoine Pitrou23df4832010-08-04 17:14:06 +00001014 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +00001015 def test_protocol(self):
1016 for proto in PROTOCOLS:
1017 ctx = ssl.SSLContext(proto)
1018 self.assertEqual(ctx.protocol, proto)
1019
1020 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001021 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001022 ctx.set_ciphers("ALL")
1023 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001024 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001025 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001026
Christian Heimes892d66e2018-01-29 14:10:18 +01001027 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1028 "Test applies only to Python default ciphers")
1029 def test_python_ciphers(self):
1030 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1031 ciphers = ctx.get_ciphers()
1032 for suite in ciphers:
1033 name = suite['name']
1034 self.assertNotIn("PSK", name)
1035 self.assertNotIn("SRP", name)
1036 self.assertNotIn("MD5", name)
1037 self.assertNotIn("RC4", name)
1038 self.assertNotIn("3DES", name)
1039
Christian Heimes25bfcd52016-09-06 00:04:45 +02001040 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1041 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001042 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001043 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001044 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001045 self.assertIn('AES256-GCM-SHA384', names)
1046 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001047
Antoine Pitrou23df4832010-08-04 17:14:06 +00001048 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001049 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001051 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001052 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001053 # SSLContext also enables these by default
1054 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001055 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1056 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001057 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001058 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001059 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001060 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001061 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1062 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001063 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001064 # Ubuntu has OP_NO_SSLv3 forced on by default
1065 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001066 else:
1067 with self.assertRaises(ValueError):
1068 ctx.options = 0
1069
Christian Heimesa170fa12017-09-15 20:27:30 +02001070 def test_verify_mode_protocol(self):
1071 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001072 # Default value
1073 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1074 ctx.verify_mode = ssl.CERT_OPTIONAL
1075 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1076 ctx.verify_mode = ssl.CERT_REQUIRED
1077 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1078 ctx.verify_mode = ssl.CERT_NONE
1079 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1080 with self.assertRaises(TypeError):
1081 ctx.verify_mode = None
1082 with self.assertRaises(ValueError):
1083 ctx.verify_mode = 42
1084
Christian Heimesa170fa12017-09-15 20:27:30 +02001085 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1086 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1087 self.assertFalse(ctx.check_hostname)
1088
1089 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1090 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1091 self.assertTrue(ctx.check_hostname)
1092
Christian Heimes61d478c2018-01-27 15:51:38 +01001093 def test_hostname_checks_common_name(self):
1094 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1095 self.assertTrue(ctx.hostname_checks_common_name)
1096 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1097 ctx.hostname_checks_common_name = True
1098 self.assertTrue(ctx.hostname_checks_common_name)
1099 ctx.hostname_checks_common_name = False
1100 self.assertFalse(ctx.hostname_checks_common_name)
1101 ctx.hostname_checks_common_name = True
1102 self.assertTrue(ctx.hostname_checks_common_name)
1103 else:
1104 with self.assertRaises(AttributeError):
1105 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001106
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001107 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1108 "required OpenSSL 1.1.0g")
1109 def test_min_max_version(self):
1110 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Miss Islington (bot)6ca71832019-01-18 07:29:08 -08001111 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1112 # Fedora override the setting to TLS 1.0.
1113 self.assertIn(
1114 ctx.minimum_version,
1115 {ssl.TLSVersion.MINIMUM_SUPPORTED, ssl.TLSVersion.TLSv1}
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001116 )
1117 self.assertEqual(
1118 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1119 )
1120
1121 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1122 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1123 self.assertEqual(
1124 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1125 )
1126 self.assertEqual(
1127 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1128 )
1129
1130 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1131 ctx.maximum_version = ssl.TLSVersion.TLSv1
1132 self.assertEqual(
1133 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1134 )
1135 self.assertEqual(
1136 ctx.maximum_version, ssl.TLSVersion.TLSv1
1137 )
1138
1139 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1140 self.assertEqual(
1141 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1142 )
1143
1144 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1145 self.assertIn(
1146 ctx.maximum_version,
1147 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1148 )
1149
1150 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1151 self.assertIn(
1152 ctx.minimum_version,
1153 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1154 )
1155
1156 with self.assertRaises(ValueError):
1157 ctx.minimum_version = 42
1158
1159 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1160
1161 self.assertEqual(
1162 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1163 )
1164 self.assertEqual(
1165 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1166 )
1167 with self.assertRaises(ValueError):
1168 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1169 with self.assertRaises(ValueError):
1170 ctx.maximum_version = ssl.TLSVersion.TLSv1
1171
1172
Christian Heimes2427b502013-11-23 11:24:32 +01001173 @unittest.skipUnless(have_verify_flags(),
1174 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001175 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001177 # default value
1178 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1179 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001180 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1181 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1182 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1183 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1184 ctx.verify_flags = ssl.VERIFY_DEFAULT
1185 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1186 # supports any value
1187 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1188 self.assertEqual(ctx.verify_flags,
1189 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1190 with self.assertRaises(TypeError):
1191 ctx.verify_flags = None
1192
Antoine Pitrou152efa22010-05-16 18:19:27 +00001193 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001194 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001195 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001196 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001197 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1198 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001199 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001200 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001201 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001202 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001203 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001204 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001205 ctx.load_cert_chain(EMPTYCERT)
1206 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001207 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001208 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1209 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1210 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001211 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001212 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001213 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001214 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001215 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001216 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1217 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001218 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001219 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001220 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001221 # Password protected key and cert
1222 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1223 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1224 ctx.load_cert_chain(CERTFILE_PROTECTED,
1225 password=bytearray(KEY_PASSWORD.encode()))
1226 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1227 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1228 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1229 bytearray(KEY_PASSWORD.encode()))
1230 with self.assertRaisesRegex(TypeError, "should be a string"):
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1232 with self.assertRaises(ssl.SSLError):
1233 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1234 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1235 # openssl has a fixed limit on the password buffer.
1236 # PEM_BUFSIZE is generally set to 1kb.
1237 # Return a string larger than this.
1238 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1239 # Password callback
1240 def getpass_unicode():
1241 return KEY_PASSWORD
1242 def getpass_bytes():
1243 return KEY_PASSWORD.encode()
1244 def getpass_bytearray():
1245 return bytearray(KEY_PASSWORD.encode())
1246 def getpass_badpass():
1247 return "badpass"
1248 def getpass_huge():
1249 return b'a' * (1024 * 1024)
1250 def getpass_bad_type():
1251 return 9
1252 def getpass_exception():
1253 raise Exception('getpass error')
1254 class GetPassCallable:
1255 def __call__(self):
1256 return KEY_PASSWORD
1257 def getpass(self):
1258 return KEY_PASSWORD
1259 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1260 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1261 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1262 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1263 ctx.load_cert_chain(CERTFILE_PROTECTED,
1264 password=GetPassCallable().getpass)
1265 with self.assertRaises(ssl.SSLError):
1266 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1267 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1268 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1269 with self.assertRaisesRegex(TypeError, "must return a string"):
1270 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1271 with self.assertRaisesRegex(Exception, "getpass error"):
1272 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1273 # Make sure the password function isn't called if it isn't needed
1274 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001275
1276 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001277 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001278 ctx.load_verify_locations(CERTFILE)
1279 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1280 ctx.load_verify_locations(BYTES_CERTFILE)
1281 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1282 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001283 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001284 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001285 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001286 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001287 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001288 ctx.load_verify_locations(BADCERT)
1289 ctx.load_verify_locations(CERTFILE, CAPATH)
1290 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1291
Victor Stinner80f75e62011-01-29 11:31:20 +00001292 # Issue #10989: crash if the second argument type is invalid
1293 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1294
Christian Heimesefff7062013-11-21 03:35:02 +01001295 def test_load_verify_cadata(self):
1296 # test cadata
1297 with open(CAFILE_CACERT) as f:
1298 cacert_pem = f.read()
1299 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1300 with open(CAFILE_NEURONIO) as f:
1301 neuronio_pem = f.read()
1302 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1303
1304 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001306 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1307 ctx.load_verify_locations(cadata=cacert_pem)
1308 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1309 ctx.load_verify_locations(cadata=neuronio_pem)
1310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1311 # cert already in hash table
1312 ctx.load_verify_locations(cadata=neuronio_pem)
1313 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1314
1315 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001316 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001317 combined = "\n".join((cacert_pem, neuronio_pem))
1318 ctx.load_verify_locations(cadata=combined)
1319 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1320
1321 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001322 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001323 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1324 neuronio_pem, "tail"]
1325 ctx.load_verify_locations(cadata="\n".join(combined))
1326 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1327
1328 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001329 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001330 ctx.load_verify_locations(cadata=cacert_der)
1331 ctx.load_verify_locations(cadata=neuronio_der)
1332 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1333 # cert already in hash table
1334 ctx.load_verify_locations(cadata=cacert_der)
1335 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1336
1337 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001338 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001339 combined = b"".join((cacert_der, neuronio_der))
1340 ctx.load_verify_locations(cadata=combined)
1341 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1342
1343 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001344 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001345 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1346
1347 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1348 ctx.load_verify_locations(cadata="broken")
1349 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1350 ctx.load_verify_locations(cadata=b"broken")
1351
1352
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001353 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001354 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001355 ctx.load_dh_params(DHFILE)
1356 if os.name != 'nt':
1357 ctx.load_dh_params(BYTES_DHFILE)
1358 self.assertRaises(TypeError, ctx.load_dh_params)
1359 self.assertRaises(TypeError, ctx.load_dh_params, None)
1360 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001361 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001362 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001363 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001364 ctx.load_dh_params(CERTFILE)
1365
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001366 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001367 def test_session_stats(self):
1368 for proto in PROTOCOLS:
1369 ctx = ssl.SSLContext(proto)
1370 self.assertEqual(ctx.session_stats(), {
1371 'number': 0,
1372 'connect': 0,
1373 'connect_good': 0,
1374 'connect_renegotiate': 0,
1375 'accept': 0,
1376 'accept_good': 0,
1377 'accept_renegotiate': 0,
1378 'hits': 0,
1379 'misses': 0,
1380 'timeouts': 0,
1381 'cache_full': 0,
1382 })
1383
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001384 def test_set_default_verify_paths(self):
1385 # There's not much we can do to test that it acts as expected,
1386 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001387 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001388 ctx.set_default_verify_paths()
1389
Antoine Pitrou501da612011-12-21 09:27:41 +01001390 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001391 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001392 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001393 ctx.set_ecdh_curve("prime256v1")
1394 ctx.set_ecdh_curve(b"prime256v1")
1395 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1396 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1397 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1398 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1399
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001400 @needs_sni
1401 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001402 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001403
1404 # set_servername_callback expects a callable, or None
1405 self.assertRaises(TypeError, ctx.set_servername_callback)
1406 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1407 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1408 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1409
1410 def dummycallback(sock, servername, ctx):
1411 pass
1412 ctx.set_servername_callback(None)
1413 ctx.set_servername_callback(dummycallback)
1414
1415 @needs_sni
1416 def test_sni_callback_refcycle(self):
1417 # Reference cycles through the servername callback are detected
1418 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001419 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001420 def dummycallback(sock, servername, ctx, cycle=ctx):
1421 pass
1422 ctx.set_servername_callback(dummycallback)
1423 wr = weakref.ref(ctx)
1424 del ctx, dummycallback
1425 gc.collect()
1426 self.assertIs(wr(), None)
1427
Christian Heimes9a5395a2013-06-17 15:44:12 +02001428 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001429 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001430 self.assertEqual(ctx.cert_store_stats(),
1431 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1432 ctx.load_cert_chain(CERTFILE)
1433 self.assertEqual(ctx.cert_store_stats(),
1434 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1435 ctx.load_verify_locations(CERTFILE)
1436 self.assertEqual(ctx.cert_store_stats(),
1437 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001438 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001439 self.assertEqual(ctx.cert_store_stats(),
1440 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1441
1442 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001443 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001444 self.assertEqual(ctx.get_ca_certs(), [])
1445 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1446 ctx.load_verify_locations(CERTFILE)
1447 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001448 # but CAFILE_CACERT is a CA cert
1449 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001450 self.assertEqual(ctx.get_ca_certs(),
1451 [{'issuer': ((('organizationName', 'Root CA'),),
1452 (('organizationalUnitName', 'http://www.cacert.org'),),
1453 (('commonName', 'CA Cert Signing Authority'),),
1454 (('emailAddress', 'support@cacert.org'),)),
1455 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1456 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1457 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001458 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001459 'subject': ((('organizationName', 'Root CA'),),
1460 (('organizationalUnitName', 'http://www.cacert.org'),),
1461 (('commonName', 'CA Cert Signing Authority'),),
1462 (('emailAddress', 'support@cacert.org'),)),
1463 'version': 3}])
1464
Martin Panterb55f8b72016-01-14 12:53:56 +00001465 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001466 pem = f.read()
1467 der = ssl.PEM_cert_to_DER_cert(pem)
1468 self.assertEqual(ctx.get_ca_certs(True), [der])
1469
Christian Heimes72d28502013-11-23 13:56:58 +01001470 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001471 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001472 ctx.load_default_certs()
1473
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001475 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1476 ctx.load_default_certs()
1477
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001479 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1480
Christian Heimesa170fa12017-09-15 20:27:30 +02001481 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001482 self.assertRaises(TypeError, ctx.load_default_certs, None)
1483 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1484
Benjamin Peterson91244e02014-10-03 18:17:15 -04001485 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001486 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001487 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001488 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001489 with support.EnvironmentVarGuard() as env:
1490 env["SSL_CERT_DIR"] = CAPATH
1491 env["SSL_CERT_FILE"] = CERTFILE
1492 ctx.load_default_certs()
1493 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1494
Benjamin Peterson91244e02014-10-03 18:17:15 -04001495 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001496 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001497 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001499 ctx.load_default_certs()
1500 stats = ctx.cert_store_stats()
1501
Christian Heimesa170fa12017-09-15 20:27:30 +02001502 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001503 with support.EnvironmentVarGuard() as env:
1504 env["SSL_CERT_DIR"] = CAPATH
1505 env["SSL_CERT_FILE"] = CERTFILE
1506 ctx.load_default_certs()
1507 stats["x509"] += 1
1508 self.assertEqual(ctx.cert_store_stats(), stats)
1509
Christian Heimes358cfd42016-09-10 22:43:48 +02001510 def _assert_context_options(self, ctx):
1511 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1512 if OP_NO_COMPRESSION != 0:
1513 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1514 OP_NO_COMPRESSION)
1515 if OP_SINGLE_DH_USE != 0:
1516 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1517 OP_SINGLE_DH_USE)
1518 if OP_SINGLE_ECDH_USE != 0:
1519 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1520 OP_SINGLE_ECDH_USE)
1521 if OP_CIPHER_SERVER_PREFERENCE != 0:
1522 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1523 OP_CIPHER_SERVER_PREFERENCE)
1524
Christian Heimes4c05b472013-11-23 15:58:30 +01001525 def test_create_default_context(self):
1526 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001527
Christian Heimesa170fa12017-09-15 20:27:30 +02001528 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001529 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001530 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001531 self._assert_context_options(ctx)
1532
Christian Heimes4c05b472013-11-23 15:58:30 +01001533 with open(SIGNING_CA) as f:
1534 cadata = f.read()
1535 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1536 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001537 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001538 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001539 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001540
1541 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001543 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001544 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001545
Christian Heimes67986f92013-11-23 22:43:47 +01001546 def test__create_stdlib_context(self):
1547 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001548 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001549 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001550 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001551 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001552
1553 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1554 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1555 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001556 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001557
1558 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001559 cert_reqs=ssl.CERT_REQUIRED,
1560 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001561 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1562 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001563 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001564 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001565
1566 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001567 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001568 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001569 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001570
Christian Heimes1aa9a752013-12-02 02:41:19 +01001571 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001572 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001573 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001574 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575
Christian Heimese82c0342017-09-15 20:29:57 +02001576 # Auto set CERT_REQUIRED
1577 ctx.check_hostname = True
1578 self.assertTrue(ctx.check_hostname)
1579 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1580 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001581 ctx.verify_mode = ssl.CERT_REQUIRED
1582 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001583 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001584
Christian Heimese82c0342017-09-15 20:29:57 +02001585 # Changing verify_mode does not affect check_hostname
1586 ctx.check_hostname = False
1587 ctx.verify_mode = ssl.CERT_NONE
1588 ctx.check_hostname = False
1589 self.assertFalse(ctx.check_hostname)
1590 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1591 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001592 ctx.check_hostname = True
1593 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001594 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1595
1596 ctx.check_hostname = False
1597 ctx.verify_mode = ssl.CERT_OPTIONAL
1598 ctx.check_hostname = False
1599 self.assertFalse(ctx.check_hostname)
1600 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1601 # keep CERT_OPTIONAL
1602 ctx.check_hostname = True
1603 self.assertTrue(ctx.check_hostname)
1604 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001605
1606 # Cannot set CERT_NONE with check_hostname enabled
1607 with self.assertRaises(ValueError):
1608 ctx.verify_mode = ssl.CERT_NONE
1609 ctx.check_hostname = False
1610 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001611 ctx.verify_mode = ssl.CERT_NONE
1612 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001613
Christian Heimes5fe668c2016-09-12 00:01:11 +02001614 def test_context_client_server(self):
1615 # PROTOCOL_TLS_CLIENT has sane defaults
1616 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1617 self.assertTrue(ctx.check_hostname)
1618 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1619
1620 # PROTOCOL_TLS_SERVER has different but also sane defaults
1621 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1622 self.assertFalse(ctx.check_hostname)
1623 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1624
Christian Heimes4df60f12017-09-15 20:26:05 +02001625 def test_context_custom_class(self):
1626 class MySSLSocket(ssl.SSLSocket):
1627 pass
1628
1629 class MySSLObject(ssl.SSLObject):
1630 pass
1631
1632 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1633 ctx.sslsocket_class = MySSLSocket
1634 ctx.sslobject_class = MySSLObject
1635
1636 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1637 self.assertIsInstance(sock, MySSLSocket)
1638 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1639 self.assertIsInstance(obj, MySSLObject)
1640
Antoine Pitrou152efa22010-05-16 18:19:27 +00001641
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001642class SSLErrorTests(unittest.TestCase):
1643
1644 def test_str(self):
1645 # The str() of a SSLError doesn't include the errno
1646 e = ssl.SSLError(1, "foo")
1647 self.assertEqual(str(e), "foo")
1648 self.assertEqual(e.errno, 1)
1649 # Same for a subclass
1650 e = ssl.SSLZeroReturnError(1, "foo")
1651 self.assertEqual(str(e), "foo")
1652 self.assertEqual(e.errno, 1)
1653
1654 def test_lib_reason(self):
1655 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001656 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001657 with self.assertRaises(ssl.SSLError) as cm:
1658 ctx.load_dh_params(CERTFILE)
1659 self.assertEqual(cm.exception.library, 'PEM')
1660 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1661 s = str(cm.exception)
1662 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1663
1664 def test_subclass(self):
1665 # Check that the appropriate SSLError subclass is raised
1666 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001667 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1668 ctx.check_hostname = False
1669 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001670 with socket.socket() as s:
1671 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001672 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001673 c = socket.socket()
1674 c.connect(s.getsockname())
1675 c.setblocking(False)
1676 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001677 with self.assertRaises(ssl.SSLWantReadError) as cm:
1678 c.do_handshake()
1679 s = str(cm.exception)
1680 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1681 # For compatibility
1682 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1683
1684
Christian Heimes61d478c2018-01-27 15:51:38 +01001685 def test_bad_server_hostname(self):
1686 ctx = ssl.create_default_context()
1687 with self.assertRaises(ValueError):
1688 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1689 server_hostname="")
1690 with self.assertRaises(ValueError):
1691 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1692 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001693 with self.assertRaises(TypeError):
1694 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1695 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001696
1697
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001698class MemoryBIOTests(unittest.TestCase):
1699
1700 def test_read_write(self):
1701 bio = ssl.MemoryBIO()
1702 bio.write(b'foo')
1703 self.assertEqual(bio.read(), b'foo')
1704 self.assertEqual(bio.read(), b'')
1705 bio.write(b'foo')
1706 bio.write(b'bar')
1707 self.assertEqual(bio.read(), b'foobar')
1708 self.assertEqual(bio.read(), b'')
1709 bio.write(b'baz')
1710 self.assertEqual(bio.read(2), b'ba')
1711 self.assertEqual(bio.read(1), b'z')
1712 self.assertEqual(bio.read(1), b'')
1713
1714 def test_eof(self):
1715 bio = ssl.MemoryBIO()
1716 self.assertFalse(bio.eof)
1717 self.assertEqual(bio.read(), b'')
1718 self.assertFalse(bio.eof)
1719 bio.write(b'foo')
1720 self.assertFalse(bio.eof)
1721 bio.write_eof()
1722 self.assertFalse(bio.eof)
1723 self.assertEqual(bio.read(2), b'fo')
1724 self.assertFalse(bio.eof)
1725 self.assertEqual(bio.read(1), b'o')
1726 self.assertTrue(bio.eof)
1727 self.assertEqual(bio.read(), b'')
1728 self.assertTrue(bio.eof)
1729
1730 def test_pending(self):
1731 bio = ssl.MemoryBIO()
1732 self.assertEqual(bio.pending, 0)
1733 bio.write(b'foo')
1734 self.assertEqual(bio.pending, 3)
1735 for i in range(3):
1736 bio.read(1)
1737 self.assertEqual(bio.pending, 3-i-1)
1738 for i in range(3):
1739 bio.write(b'x')
1740 self.assertEqual(bio.pending, i+1)
1741 bio.read()
1742 self.assertEqual(bio.pending, 0)
1743
1744 def test_buffer_types(self):
1745 bio = ssl.MemoryBIO()
1746 bio.write(b'foo')
1747 self.assertEqual(bio.read(), b'foo')
1748 bio.write(bytearray(b'bar'))
1749 self.assertEqual(bio.read(), b'bar')
1750 bio.write(memoryview(b'baz'))
1751 self.assertEqual(bio.read(), b'baz')
1752
1753 def test_error_types(self):
1754 bio = ssl.MemoryBIO()
1755 self.assertRaises(TypeError, bio.write, 'foo')
1756 self.assertRaises(TypeError, bio.write, None)
1757 self.assertRaises(TypeError, bio.write, True)
1758 self.assertRaises(TypeError, bio.write, 1)
1759
1760
Christian Heimes89c20512018-02-27 11:17:32 +01001761class SSLObjectTests(unittest.TestCase):
1762 def test_private_init(self):
1763 bio = ssl.MemoryBIO()
1764 with self.assertRaisesRegex(TypeError, "public constructor"):
1765 ssl.SSLObject(bio, bio)
1766
Miss Islington (bot)c00f7032018-09-21 22:00:42 -07001767 def test_unwrap(self):
1768 client_ctx, server_ctx, hostname = testing_context()
1769 c_in = ssl.MemoryBIO()
1770 c_out = ssl.MemoryBIO()
1771 s_in = ssl.MemoryBIO()
1772 s_out = ssl.MemoryBIO()
1773 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1774 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1775
1776 # Loop on the handshake for a bit to get it settled
1777 for _ in range(5):
1778 try:
1779 client.do_handshake()
1780 except ssl.SSLWantReadError:
1781 pass
1782 if c_out.pending:
1783 s_in.write(c_out.read())
1784 try:
1785 server.do_handshake()
1786 except ssl.SSLWantReadError:
1787 pass
1788 if s_out.pending:
1789 c_in.write(s_out.read())
1790 # Now the handshakes should be complete (don't raise WantReadError)
1791 client.do_handshake()
1792 server.do_handshake()
1793
1794 # Now if we unwrap one side unilaterally, it should send close-notify
1795 # and raise WantReadError:
1796 with self.assertRaises(ssl.SSLWantReadError):
1797 client.unwrap()
1798
1799 # But server.unwrap() does not raise, because it reads the client's
1800 # close-notify:
1801 s_in.write(c_out.read())
1802 server.unwrap()
1803
1804 # And now that the client gets the server's close-notify, it doesn't
1805 # raise either.
1806 c_in.write(s_out.read())
1807 client.unwrap()
Christian Heimes89c20512018-02-27 11:17:32 +01001808
Martin Panter3840b2a2016-03-27 01:53:46 +00001809class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001810 """Tests that connect to a simple server running in the background"""
1811
1812 def setUp(self):
1813 server = ThreadedEchoServer(SIGNED_CERTFILE)
1814 self.server_addr = (HOST, server.port)
1815 server.__enter__()
1816 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001817
Antoine Pitrou480a1242010-04-28 21:37:09 +00001818 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001819 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001820 cert_reqs=ssl.CERT_NONE) as s:
1821 s.connect(self.server_addr)
1822 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001823 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001824
Martin Panter3840b2a2016-03-27 01:53:46 +00001825 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001826 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001827 cert_reqs=ssl.CERT_REQUIRED,
1828 ca_certs=SIGNING_CA) as s:
1829 s.connect(self.server_addr)
1830 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001831 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001832
Martin Panter3840b2a2016-03-27 01:53:46 +00001833 def test_connect_fail(self):
1834 # This should fail because we have no verification certs. Connection
1835 # failure crashes ThreadedEchoServer, so run this in an independent
1836 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001837 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001838 cert_reqs=ssl.CERT_REQUIRED)
1839 self.addCleanup(s.close)
1840 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1841 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001842
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001843 def test_connect_ex(self):
1844 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001845 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001846 cert_reqs=ssl.CERT_REQUIRED,
1847 ca_certs=SIGNING_CA)
1848 self.addCleanup(s.close)
1849 self.assertEqual(0, s.connect_ex(self.server_addr))
1850 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001851
1852 def test_non_blocking_connect_ex(self):
1853 # Issue #11326: non-blocking connect_ex() should allow handshake
1854 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001855 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001856 cert_reqs=ssl.CERT_REQUIRED,
1857 ca_certs=SIGNING_CA,
1858 do_handshake_on_connect=False)
1859 self.addCleanup(s.close)
1860 s.setblocking(False)
1861 rc = s.connect_ex(self.server_addr)
1862 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1863 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1864 # Wait for connect to finish
1865 select.select([], [s], [], 5.0)
1866 # Non-blocking handshake
1867 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001868 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001869 s.do_handshake()
1870 break
1871 except ssl.SSLWantReadError:
1872 select.select([s], [], [], 5.0)
1873 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001874 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001875 # SSL established
1876 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001877
Antoine Pitrou152efa22010-05-16 18:19:27 +00001878 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001879 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001880 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001881 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1882 s.connect(self.server_addr)
1883 self.assertEqual({}, s.getpeercert())
1884 # Same with a server hostname
1885 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1886 server_hostname="dummy") as s:
1887 s.connect(self.server_addr)
1888 ctx.verify_mode = ssl.CERT_REQUIRED
1889 # This should succeed because we specify the root cert
1890 ctx.load_verify_locations(SIGNING_CA)
1891 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1892 s.connect(self.server_addr)
1893 cert = s.getpeercert()
1894 self.assertTrue(cert)
1895
1896 def test_connect_with_context_fail(self):
1897 # This should fail because we have no verification certs. Connection
1898 # failure crashes ThreadedEchoServer, so run this in an independent
1899 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001900 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001901 ctx.verify_mode = ssl.CERT_REQUIRED
1902 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1903 self.addCleanup(s.close)
1904 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1905 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001906
1907 def test_connect_capath(self):
1908 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001909 # NOTE: the subject hashing algorithm has been changed between
1910 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1911 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001912 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001913 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001914 ctx.verify_mode = ssl.CERT_REQUIRED
1915 ctx.load_verify_locations(capath=CAPATH)
1916 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1917 s.connect(self.server_addr)
1918 cert = s.getpeercert()
1919 self.assertTrue(cert)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07001920
Martin Panter3840b2a2016-03-27 01:53:46 +00001921 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001922 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001923 ctx.verify_mode = ssl.CERT_REQUIRED
1924 ctx.load_verify_locations(capath=BYTES_CAPATH)
1925 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1926 s.connect(self.server_addr)
1927 cert = s.getpeercert()
1928 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001929
Christian Heimesefff7062013-11-21 03:35:02 +01001930 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001931 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001932 pem = f.read()
1933 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001934 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001935 ctx.verify_mode = ssl.CERT_REQUIRED
1936 ctx.load_verify_locations(cadata=pem)
1937 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1938 s.connect(self.server_addr)
1939 cert = s.getpeercert()
1940 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001941
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001943 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001944 ctx.verify_mode = ssl.CERT_REQUIRED
1945 ctx.load_verify_locations(cadata=der)
1946 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1947 s.connect(self.server_addr)
1948 cert = s.getpeercert()
1949 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001950
Antoine Pitroue3220242010-04-24 11:13:53 +00001951 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1952 def test_makefile_close(self):
1953 # Issue #5238: creating a file-like object with makefile() shouldn't
1954 # delay closing the underlying "real socket" (here tested with its
1955 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001956 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001957 ss.connect(self.server_addr)
1958 fd = ss.fileno()
1959 f = ss.makefile()
1960 f.close()
1961 # The fd is still open
1962 os.read(fd, 0)
1963 # Closing the SSL socket should close the fd too
1964 ss.close()
1965 gc.collect()
1966 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001967 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001968 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001969
Antoine Pitrou480a1242010-04-28 21:37:09 +00001970 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001971 s = socket.socket(socket.AF_INET)
1972 s.connect(self.server_addr)
1973 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001974 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001975 cert_reqs=ssl.CERT_NONE,
1976 do_handshake_on_connect=False)
1977 self.addCleanup(s.close)
1978 count = 0
1979 while True:
1980 try:
1981 count += 1
1982 s.do_handshake()
1983 break
1984 except ssl.SSLWantReadError:
1985 select.select([s], [], [])
1986 except ssl.SSLWantWriteError:
1987 select.select([], [s], [])
1988 if support.verbose:
1989 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001990
Antoine Pitrou480a1242010-04-28 21:37:09 +00001991 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001992 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001993
Martin Panter3840b2a2016-03-27 01:53:46 +00001994 def test_get_server_certificate_fail(self):
1995 # Connection failure crashes ThreadedEchoServer, so run this in an
1996 # independent test method
1997 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001998
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001999 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002000 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002001 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2002 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002003 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002004 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2005 s.connect(self.server_addr)
2006 # Error checking can happen at instantiation or when connecting
2007 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2008 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002009 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002010 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2011 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002012
Christian Heimes9a5395a2013-06-17 15:44:12 +02002013 def test_get_ca_certs_capath(self):
2014 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002016 ctx.load_verify_locations(capath=CAPATH)
2017 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002018 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2019 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002020 s.connect(self.server_addr)
2021 cert = s.getpeercert()
2022 self.assertTrue(cert)
2023 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002024
Christian Heimes575596e2013-12-15 21:49:17 +01002025 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002026 def test_context_setget(self):
2027 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002028 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2029 ctx1.load_verify_locations(capath=CAPATH)
2030 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2031 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002032 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002033 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002034 ss.connect(self.server_addr)
2035 self.assertIs(ss.context, ctx1)
2036 self.assertIs(ss._sslobj.context, ctx1)
2037 ss.context = ctx2
2038 self.assertIs(ss.context, ctx2)
2039 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002040
2041 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2042 # A simple IO loop. Call func(*args) depending on the error we get
2043 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2044 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002045 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002046 count = 0
2047 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002048 if time.monotonic() > deadline:
2049 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002050 errno = None
2051 count += 1
2052 try:
2053 ret = func(*args)
2054 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002055 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002056 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002057 raise
2058 errno = e.errno
2059 # Get any data from the outgoing BIO irrespective of any error, and
2060 # send it to the socket.
2061 buf = outgoing.read()
2062 sock.sendall(buf)
2063 # If there's no error, we're done. For WANT_READ, we need to get
2064 # data from the socket and put it in the incoming BIO.
2065 if errno is None:
2066 break
2067 elif errno == ssl.SSL_ERROR_WANT_READ:
2068 buf = sock.recv(32768)
2069 if buf:
2070 incoming.write(buf)
2071 else:
2072 incoming.write_eof()
2073 if support.verbose:
2074 sys.stdout.write("Needed %d calls to complete %s().\n"
2075 % (count, func.__name__))
2076 return ret
2077
Martin Panter3840b2a2016-03-27 01:53:46 +00002078 def test_bio_handshake(self):
2079 sock = socket.socket(socket.AF_INET)
2080 self.addCleanup(sock.close)
2081 sock.connect(self.server_addr)
2082 incoming = ssl.MemoryBIO()
2083 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002084 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2085 self.assertTrue(ctx.check_hostname)
2086 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002087 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002088 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2089 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002090 self.assertIs(sslobj._sslobj.owner, sslobj)
2091 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002092 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002093 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002094 self.assertRaises(ValueError, sslobj.getpeercert)
2095 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2096 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2097 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2098 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002099 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002100 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002101 self.assertTrue(sslobj.getpeercert())
2102 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2103 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2104 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002105 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002106 except ssl.SSLSyscallError:
2107 # If the server shuts down the TCP connection without sending a
2108 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2109 pass
2110 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2111
2112 def test_bio_read_write_data(self):
2113 sock = socket.socket(socket.AF_INET)
2114 self.addCleanup(sock.close)
2115 sock.connect(self.server_addr)
2116 incoming = ssl.MemoryBIO()
2117 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002118 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002119 ctx.verify_mode = ssl.CERT_NONE
2120 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2121 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2122 req = b'FOO\n'
2123 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2124 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2125 self.assertEqual(buf, b'foo\n')
2126 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002127
2128
Martin Panter3840b2a2016-03-27 01:53:46 +00002129class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002130
Martin Panter3840b2a2016-03-27 01:53:46 +00002131 def test_timeout_connect_ex(self):
2132 # Issue #12065: on a timeout, connect_ex() should return the original
2133 # errno (mimicking the behaviour of non-SSL sockets).
2134 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002135 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002136 cert_reqs=ssl.CERT_REQUIRED,
2137 do_handshake_on_connect=False)
2138 self.addCleanup(s.close)
2139 s.settimeout(0.0000001)
2140 rc = s.connect_ex((REMOTE_HOST, 443))
2141 if rc == 0:
2142 self.skipTest("REMOTE_HOST responded too quickly")
2143 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2144
2145 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2146 def test_get_server_certificate_ipv6(self):
2147 with support.transient_internet('ipv6.google.com'):
2148 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2149 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2150
Martin Panter3840b2a2016-03-27 01:53:46 +00002151
2152def _test_get_server_certificate(test, host, port, cert=None):
2153 pem = ssl.get_server_certificate((host, port))
2154 if not pem:
2155 test.fail("No server certificate on %s:%s!" % (host, port))
2156
2157 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2158 if not pem:
2159 test.fail("No server certificate on %s:%s!" % (host, port))
2160 if support.verbose:
2161 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2162
2163def _test_get_server_certificate_fail(test, host, port):
2164 try:
2165 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2166 except ssl.SSLError as x:
2167 #should fail
2168 if support.verbose:
2169 sys.stdout.write("%s\n" % x)
2170 else:
2171 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2172
2173
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002174from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002175
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002176class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002177
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002178 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002179
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002180 """A mildly complicated class, because we want it to work both
2181 with and without the SSL wrapper around the socket connection, so
2182 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002183
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002184 def __init__(self, server, connsock, addr):
2185 self.server = server
2186 self.running = False
2187 self.sock = connsock
2188 self.addr = addr
2189 self.sock.setblocking(1)
2190 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002191 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002192 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002193
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002194 def wrap_conn(self):
2195 try:
2196 self.sslconn = self.server.context.wrap_socket(
2197 self.sock, server_side=True)
2198 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2199 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002200 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002201 # We treat ConnectionResetError as though it were an
2202 # SSLError - OpenSSL on Ubuntu abruptly closes the
2203 # connection when asked to use an unsupported protocol.
2204 #
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002205 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2206 # tries to send session tickets after handshake.
2207 # https://github.com/openssl/openssl/issues/6342
2208 self.server.conn_errors.append(str(e))
2209 if self.server.chatty:
2210 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2211 self.running = False
2212 self.close()
2213 return False
2214 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002215 # OSError may occur with wrong protocols, e.g. both
2216 # sides use PROTOCOL_TLS_SERVER.
2217 #
2218 # XXX Various errors can have happened here, for example
2219 # a mismatching protocol version, an invalid certificate,
2220 # or a low-level bug. This should be made more discriminating.
2221 #
2222 # bpo-31323: Store the exception as string to prevent
2223 # a reference leak: server -> conn_errors -> exception
2224 # -> traceback -> self (ConnectionHandler) -> server
2225 self.server.conn_errors.append(str(e))
2226 if self.server.chatty:
2227 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2228 self.running = False
2229 self.server.stop()
2230 self.close()
2231 return False
2232 else:
2233 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2234 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2235 cert = self.sslconn.getpeercert()
2236 if support.verbose and self.server.chatty:
2237 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2238 cert_binary = self.sslconn.getpeercert(True)
2239 if support.verbose and self.server.chatty:
2240 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2241 cipher = self.sslconn.cipher()
2242 if support.verbose and self.server.chatty:
2243 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2244 sys.stdout.write(" server: selected protocol is now "
2245 + str(self.sslconn.selected_npn_protocol()) + "\n")
2246 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002247
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002248 def read(self):
2249 if self.sslconn:
2250 return self.sslconn.read()
2251 else:
2252 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002253
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002254 def write(self, bytes):
2255 if self.sslconn:
2256 return self.sslconn.write(bytes)
2257 else:
2258 return self.sock.send(bytes)
2259
2260 def close(self):
2261 if self.sslconn:
2262 self.sslconn.close()
2263 else:
2264 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002265
Antoine Pitrou480a1242010-04-28 21:37:09 +00002266 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002267 self.running = True
2268 if not self.server.starttls_server:
2269 if not self.wrap_conn():
2270 return
2271 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002272 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002273 msg = self.read()
2274 stripped = msg.strip()
2275 if not stripped:
2276 # eof, so quit this handler
2277 self.running = False
2278 try:
2279 self.sock = self.sslconn.unwrap()
2280 except OSError:
2281 # Many tests shut the TCP connection down
2282 # without an SSL shutdown. This causes
2283 # unwrap() to raise OSError with errno=0!
2284 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002285 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002286 self.sslconn = None
2287 self.close()
2288 elif stripped == b'over':
2289 if support.verbose and self.server.connectionchatty:
2290 sys.stdout.write(" server: client closed connection\n")
2291 self.close()
2292 return
2293 elif (self.server.starttls_server and
2294 stripped == b'STARTTLS'):
2295 if support.verbose and self.server.connectionchatty:
2296 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2297 self.write(b"OK\n")
2298 if not self.wrap_conn():
2299 return
2300 elif (self.server.starttls_server and self.sslconn
2301 and stripped == b'ENDTLS'):
2302 if support.verbose and self.server.connectionchatty:
2303 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2304 self.write(b"OK\n")
2305 self.sock = self.sslconn.unwrap()
2306 self.sslconn = None
2307 if support.verbose and self.server.connectionchatty:
2308 sys.stdout.write(" server: connection is now unencrypted...\n")
2309 elif stripped == b'CB tls-unique':
2310 if support.verbose and self.server.connectionchatty:
2311 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2312 data = self.sslconn.get_channel_binding("tls-unique")
2313 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes2756ef32018-09-23 09:22:52 +02002314 elif stripped == b'PHA':
2315 if support.verbose and self.server.connectionchatty:
2316 sys.stdout.write(" server: initiating post handshake auth\n")
2317 try:
2318 self.sslconn.verify_client_post_handshake()
2319 except ssl.SSLError as e:
2320 self.write(repr(e).encode("us-ascii") + b"\n")
2321 else:
2322 self.write(b"OK\n")
2323 elif stripped == b'HASCERT':
2324 if self.sslconn.getpeercert() is not None:
2325 self.write(b'TRUE\n')
2326 else:
2327 self.write(b'FALSE\n')
2328 elif stripped == b'GETCERT':
2329 cert = self.sslconn.getpeercert()
2330 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002331 else:
2332 if (support.verbose and
2333 self.server.connectionchatty):
2334 ctype = (self.sslconn and "encrypted") or "unencrypted"
2335 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2336 % (msg, ctype, msg.lower(), ctype))
2337 self.write(msg.lower())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002338 except ConnectionResetError:
2339 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2340 # when connection is not shut down gracefully.
2341 if self.server.chatty and support.verbose:
2342 sys.stdout.write(
2343 " Connection reset by peer: {}\n".format(
2344 self.addr)
2345 )
2346 self.close()
2347 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002348 except OSError:
2349 if self.server.chatty:
2350 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002351 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002352 self.running = False
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002353
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002354 # normally, we'd just stop here, but for the test
2355 # harness, we want to stop the server
2356 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002357
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002358 def __init__(self, certificate=None, ssl_version=None,
2359 certreqs=None, cacerts=None,
2360 chatty=True, connectionchatty=False, starttls_server=False,
2361 npn_protocols=None, alpn_protocols=None,
2362 ciphers=None, context=None):
2363 if context:
2364 self.context = context
2365 else:
2366 self.context = ssl.SSLContext(ssl_version
2367 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002368 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002369 self.context.verify_mode = (certreqs if certreqs is not None
2370 else ssl.CERT_NONE)
2371 if cacerts:
2372 self.context.load_verify_locations(cacerts)
2373 if certificate:
2374 self.context.load_cert_chain(certificate)
2375 if npn_protocols:
2376 self.context.set_npn_protocols(npn_protocols)
2377 if alpn_protocols:
2378 self.context.set_alpn_protocols(alpn_protocols)
2379 if ciphers:
2380 self.context.set_ciphers(ciphers)
2381 self.chatty = chatty
2382 self.connectionchatty = connectionchatty
2383 self.starttls_server = starttls_server
2384 self.sock = socket.socket()
2385 self.port = support.bind_port(self.sock)
2386 self.flag = None
2387 self.active = False
2388 self.selected_npn_protocols = []
2389 self.selected_alpn_protocols = []
2390 self.shared_ciphers = []
2391 self.conn_errors = []
2392 threading.Thread.__init__(self)
2393 self.daemon = True
2394
2395 def __enter__(self):
2396 self.start(threading.Event())
2397 self.flag.wait()
2398 return self
2399
2400 def __exit__(self, *args):
2401 self.stop()
2402 self.join()
2403
2404 def start(self, flag=None):
2405 self.flag = flag
2406 threading.Thread.start(self)
2407
2408 def run(self):
2409 self.sock.settimeout(0.05)
2410 self.sock.listen()
2411 self.active = True
2412 if self.flag:
2413 # signal an event
2414 self.flag.set()
2415 while self.active:
2416 try:
2417 newconn, connaddr = self.sock.accept()
2418 if support.verbose and self.chatty:
2419 sys.stdout.write(' server: new connection from '
2420 + repr(connaddr) + '\n')
2421 handler = self.ConnectionHandler(self, newconn, connaddr)
2422 handler.start()
2423 handler.join()
2424 except socket.timeout:
2425 pass
2426 except KeyboardInterrupt:
2427 self.stop()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002428 except BaseException as e:
2429 if support.verbose and self.chatty:
2430 sys.stdout.write(
2431 ' connection handling failed: ' + repr(e) + '\n')
2432
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002433 self.sock.close()
2434
2435 def stop(self):
2436 self.active = False
2437
2438class AsyncoreEchoServer(threading.Thread):
2439
2440 # this one's based on asyncore.dispatcher
2441
2442 class EchoServer (asyncore.dispatcher):
2443
2444 class ConnectionHandler(asyncore.dispatcher_with_send):
2445
2446 def __init__(self, conn, certfile):
2447 self.socket = test_wrap_socket(conn, server_side=True,
2448 certfile=certfile,
2449 do_handshake_on_connect=False)
2450 asyncore.dispatcher_with_send.__init__(self, self.socket)
2451 self._ssl_accepting = True
2452 self._do_ssl_handshake()
2453
2454 def readable(self):
2455 if isinstance(self.socket, ssl.SSLSocket):
2456 while self.socket.pending() > 0:
2457 self.handle_read_event()
2458 return True
2459
2460 def _do_ssl_handshake(self):
2461 try:
2462 self.socket.do_handshake()
2463 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2464 return
2465 except ssl.SSLEOFError:
2466 return self.handle_close()
2467 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002468 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002469 except OSError as err:
2470 if err.args[0] == errno.ECONNABORTED:
2471 return self.handle_close()
2472 else:
2473 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002474
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 def handle_read(self):
2476 if self._ssl_accepting:
2477 self._do_ssl_handshake()
2478 else:
2479 data = self.recv(1024)
2480 if support.verbose:
2481 sys.stdout.write(" server: read %s from client\n" % repr(data))
2482 if not data:
2483 self.close()
2484 else:
2485 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002486
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002487 def handle_close(self):
2488 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002489 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002490 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002491
2492 def handle_error(self):
2493 raise
2494
Trent Nelson78520002008-04-10 20:54:35 +00002495 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002496 self.certfile = certfile
2497 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2498 self.port = support.bind_port(sock, '')
2499 asyncore.dispatcher.__init__(self, sock)
2500 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002501
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002503 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002504 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2505 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002506
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002507 def handle_error(self):
2508 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002509
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002510 def __init__(self, certfile):
2511 self.flag = None
2512 self.active = False
2513 self.server = self.EchoServer(certfile)
2514 self.port = self.server.port
2515 threading.Thread.__init__(self)
2516 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002517
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002518 def __str__(self):
2519 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002520
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002521 def __enter__(self):
2522 self.start(threading.Event())
2523 self.flag.wait()
2524 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002525
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002526 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002527 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002528 sys.stdout.write(" cleanup: stopping server.\n")
2529 self.stop()
2530 if support.verbose:
2531 sys.stdout.write(" cleanup: joining server thread.\n")
2532 self.join()
2533 if support.verbose:
2534 sys.stdout.write(" cleanup: successfully joined.\n")
2535 # make sure that ConnectionHandler is removed from socket_map
2536 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002537
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002538 def start (self, flag=None):
2539 self.flag = flag
2540 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002541
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002542 def run(self):
2543 self.active = True
2544 if self.flag:
2545 self.flag.set()
2546 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002547 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002548 asyncore.loop(1)
2549 except:
2550 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002551
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002552 def stop(self):
2553 self.active = False
2554 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002555
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002556def server_params_test(client_context, server_context, indata=b"FOO\n",
2557 chatty=True, connectionchatty=False, sni_name=None,
2558 session=None):
2559 """
2560 Launch a server, connect a client to it and try various reads
2561 and writes.
2562 """
2563 stats = {}
2564 server = ThreadedEchoServer(context=server_context,
2565 chatty=chatty,
2566 connectionchatty=False)
2567 with server:
2568 with client_context.wrap_socket(socket.socket(),
2569 server_hostname=sni_name, session=session) as s:
2570 s.connect((HOST, server.port))
2571 for arg in [indata, bytearray(indata), memoryview(indata)]:
2572 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002573 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002574 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002575 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002576 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002577 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002578 if connectionchatty:
2579 if support.verbose:
2580 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002581 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002582 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002583 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2584 % (outdata[:20], len(outdata),
2585 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 s.write(b"over\n")
2587 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002588 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002589 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590 stats.update({
2591 'compression': s.compression(),
2592 'cipher': s.cipher(),
2593 'peercert': s.getpeercert(),
2594 'client_alpn_protocol': s.selected_alpn_protocol(),
2595 'client_npn_protocol': s.selected_npn_protocol(),
2596 'version': s.version(),
2597 'session_reused': s.session_reused,
2598 'session': s.session,
2599 })
2600 s.close()
2601 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2602 stats['server_npn_protocols'] = server.selected_npn_protocols
2603 stats['server_shared_ciphers'] = server.shared_ciphers
2604 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002605
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002606def try_protocol_combo(server_protocol, client_protocol, expect_success,
2607 certsreqs=None, server_options=0, client_options=0):
2608 """
2609 Try to SSL-connect using *client_protocol* to *server_protocol*.
2610 If *expect_success* is true, assert that the connection succeeds,
2611 if it's false, assert that the connection fails.
2612 Also, if *expect_success* is a string, assert that it is the protocol
2613 version actually used by the connection.
2614 """
2615 if certsreqs is None:
2616 certsreqs = ssl.CERT_NONE
2617 certtype = {
2618 ssl.CERT_NONE: "CERT_NONE",
2619 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2620 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2621 }[certsreqs]
2622 if support.verbose:
2623 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2624 sys.stdout.write(formatstr %
2625 (ssl.get_protocol_name(client_protocol),
2626 ssl.get_protocol_name(server_protocol),
2627 certtype))
2628 client_context = ssl.SSLContext(client_protocol)
2629 client_context.options |= client_options
2630 server_context = ssl.SSLContext(server_protocol)
2631 server_context.options |= server_options
2632
2633 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2634 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2635 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002636 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002637 client_context.set_ciphers("ALL")
2638
2639 for ctx in (client_context, server_context):
2640 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002641 ctx.load_cert_chain(SIGNED_CERTFILE)
2642 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002643 try:
2644 stats = server_params_test(client_context, server_context,
2645 chatty=False, connectionchatty=False)
2646 # Protocol mismatch can result in either an SSLError, or a
2647 # "Connection reset by peer" error.
2648 except ssl.SSLError:
2649 if expect_success:
2650 raise
2651 except OSError as e:
2652 if expect_success or e.errno != errno.ECONNRESET:
2653 raise
2654 else:
2655 if not expect_success:
2656 raise AssertionError(
2657 "Client protocol %s succeeded with server protocol %s!"
2658 % (ssl.get_protocol_name(client_protocol),
2659 ssl.get_protocol_name(server_protocol)))
2660 elif (expect_success is not True
2661 and expect_success != stats['version']):
2662 raise AssertionError("version mismatch: expected %r, got %r"
2663 % (expect_success, stats['version']))
2664
2665
2666class ThreadedTests(unittest.TestCase):
2667
2668 @skip_if_broken_ubuntu_ssl
2669 def test_echo(self):
2670 """Basic test of an SSL client connecting to a server"""
2671 if support.verbose:
2672 sys.stdout.write("\n")
2673 for protocol in PROTOCOLS:
2674 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2675 continue
2676 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2677 context = ssl.SSLContext(protocol)
2678 context.load_cert_chain(CERTFILE)
2679 server_params_test(context, context,
2680 chatty=True, connectionchatty=True)
2681
Christian Heimesa170fa12017-09-15 20:27:30 +02002682 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002683
2684 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2685 server_params_test(client_context=client_context,
2686 server_context=server_context,
2687 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002688 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002689
2690 client_context.check_hostname = False
2691 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2692 with self.assertRaises(ssl.SSLError) as e:
2693 server_params_test(client_context=server_context,
2694 server_context=client_context,
2695 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002696 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002697 self.assertIn('called a function you should not call',
2698 str(e.exception))
2699
2700 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2701 with self.assertRaises(ssl.SSLError) as e:
2702 server_params_test(client_context=server_context,
2703 server_context=server_context,
2704 chatty=True, connectionchatty=True)
2705 self.assertIn('called a function you should not call',
2706 str(e.exception))
2707
2708 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2709 with self.assertRaises(ssl.SSLError) as e:
2710 server_params_test(client_context=server_context,
2711 server_context=client_context,
2712 chatty=True, connectionchatty=True)
2713 self.assertIn('called a function you should not call',
2714 str(e.exception))
2715
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002716 def test_getpeercert(self):
2717 if support.verbose:
2718 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002719
2720 client_context, server_context, hostname = testing_context()
2721 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002722 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002723 with client_context.wrap_socket(socket.socket(),
2724 do_handshake_on_connect=False,
2725 server_hostname=hostname) as s:
2726 s.connect((HOST, server.port))
2727 # getpeercert() raise ValueError while the handshake isn't
2728 # done.
2729 with self.assertRaises(ValueError):
2730 s.getpeercert()
2731 s.do_handshake()
2732 cert = s.getpeercert()
2733 self.assertTrue(cert, "Can't get peer certificate.")
2734 cipher = s.cipher()
2735 if support.verbose:
2736 sys.stdout.write(pprint.pformat(cert) + '\n')
2737 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2738 if 'subject' not in cert:
2739 self.fail("No subject field in certificate: %s." %
2740 pprint.pformat(cert))
2741 if ((('organizationName', 'Python Software Foundation'),)
2742 not in cert['subject']):
2743 self.fail(
2744 "Missing or invalid 'organizationName' field in certificate subject; "
2745 "should be 'Python Software Foundation'.")
2746 self.assertIn('notBefore', cert)
2747 self.assertIn('notAfter', cert)
2748 before = ssl.cert_time_to_seconds(cert['notBefore'])
2749 after = ssl.cert_time_to_seconds(cert['notAfter'])
2750 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002751
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002752 @unittest.skipUnless(have_verify_flags(),
2753 "verify_flags need OpenSSL > 0.9.8")
2754 def test_crl_check(self):
2755 if support.verbose:
2756 sys.stdout.write("\n")
2757
Christian Heimesa170fa12017-09-15 20:27:30 +02002758 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002759
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002760 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002761 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002762
2763 # VERIFY_DEFAULT should pass
2764 server = ThreadedEchoServer(context=server_context, chatty=True)
2765 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002766 with client_context.wrap_socket(socket.socket(),
2767 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002768 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002769 cert = s.getpeercert()
2770 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002771
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002772 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002773 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002774
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002775 server = ThreadedEchoServer(context=server_context, chatty=True)
2776 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002777 with client_context.wrap_socket(socket.socket(),
2778 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002779 with self.assertRaisesRegex(ssl.SSLError,
2780 "certificate verify failed"):
2781 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002782
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002783 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002784 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002785
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002786 server = ThreadedEchoServer(context=server_context, chatty=True)
2787 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002788 with client_context.wrap_socket(socket.socket(),
2789 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002790 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002791 cert = s.getpeercert()
2792 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002793
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002794 def test_check_hostname(self):
2795 if support.verbose:
2796 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002797
Christian Heimesa170fa12017-09-15 20:27:30 +02002798 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002799
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002800 # correct hostname should verify
2801 server = ThreadedEchoServer(context=server_context, chatty=True)
2802 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002803 with client_context.wrap_socket(socket.socket(),
2804 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002805 s.connect((HOST, server.port))
2806 cert = s.getpeercert()
2807 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002808
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002809 # incorrect hostname should raise an exception
2810 server = ThreadedEchoServer(context=server_context, chatty=True)
2811 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002812 with client_context.wrap_socket(socket.socket(),
2813 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002814 with self.assertRaisesRegex(
2815 ssl.CertificateError,
2816 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002817 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002818
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002819 # missing server_hostname arg should cause an exception, too
2820 server = ThreadedEchoServer(context=server_context, chatty=True)
2821 with server:
2822 with socket.socket() as s:
2823 with self.assertRaisesRegex(ValueError,
2824 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002825 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002826
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002827 def test_ecc_cert(self):
2828 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2829 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002830 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002831 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2832
2833 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2834 # load ECC cert
2835 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2836
2837 # correct hostname should verify
2838 server = ThreadedEchoServer(context=server_context, chatty=True)
2839 with server:
2840 with client_context.wrap_socket(socket.socket(),
2841 server_hostname=hostname) as s:
2842 s.connect((HOST, server.port))
2843 cert = s.getpeercert()
2844 self.assertTrue(cert, "Can't get peer certificate.")
2845 cipher = s.cipher()[0].split('-')
2846 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2847
2848 def test_dual_rsa_ecc(self):
2849 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2850 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002851 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2852 # algorithms.
2853 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002854 # only ECDSA certs
2855 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2856 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2857
2858 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2859 # load ECC and RSA key/cert pairs
2860 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2861 server_context.load_cert_chain(SIGNED_CERTFILE)
2862
2863 # correct hostname should verify
2864 server = ThreadedEchoServer(context=server_context, chatty=True)
2865 with server:
2866 with client_context.wrap_socket(socket.socket(),
2867 server_hostname=hostname) as s:
2868 s.connect((HOST, server.port))
2869 cert = s.getpeercert()
2870 self.assertTrue(cert, "Can't get peer certificate.")
2871 cipher = s.cipher()[0].split('-')
2872 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2873
Christian Heimes66e57422018-01-29 14:25:13 +01002874 def test_check_hostname_idn(self):
2875 if support.verbose:
2876 sys.stdout.write("\n")
2877
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002878 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002879 server_context.load_cert_chain(IDNSANSFILE)
2880
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002881 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002882 context.verify_mode = ssl.CERT_REQUIRED
2883 context.check_hostname = True
2884 context.load_verify_locations(SIGNING_CA)
2885
2886 # correct hostname should verify, when specified in several
2887 # different ways
2888 idn_hostnames = [
2889 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002890 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002891 ('xn--knig-5qa.idn.pythontest.net',
2892 'xn--knig-5qa.idn.pythontest.net'),
2893 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002894 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002895
2896 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002897 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002898 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2899 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2900 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002901 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2902
2903 # ('königsgäßchen.idna2008.pythontest.net',
2904 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2905 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2906 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2907 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2908 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2909
Christian Heimes66e57422018-01-29 14:25:13 +01002910 ]
2911 for server_hostname, expected_hostname in idn_hostnames:
2912 server = ThreadedEchoServer(context=server_context, chatty=True)
2913 with server:
2914 with context.wrap_socket(socket.socket(),
2915 server_hostname=server_hostname) as s:
2916 self.assertEqual(s.server_hostname, expected_hostname)
2917 s.connect((HOST, server.port))
2918 cert = s.getpeercert()
2919 self.assertEqual(s.server_hostname, expected_hostname)
2920 self.assertTrue(cert, "Can't get peer certificate.")
2921
Christian Heimes66e57422018-01-29 14:25:13 +01002922 # incorrect hostname should raise an exception
2923 server = ThreadedEchoServer(context=server_context, chatty=True)
2924 with server:
2925 with context.wrap_socket(socket.socket(),
2926 server_hostname="python.example.org") as s:
2927 with self.assertRaises(ssl.CertificateError):
2928 s.connect((HOST, server.port))
2929
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002930 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002931 """Connecting when the server rejects the client's certificate
2932
2933 Launch a server with CERT_REQUIRED, and check that trying to
2934 connect to it with a wrong client certificate fails.
2935 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002936 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002937 # load client cert that is not signed by trusted CA
2938 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002939 # require TLS client authentication
2940 server_context.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002941 # TLS 1.3 has different handshake
2942 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002943
2944 server = ThreadedEchoServer(
2945 context=server_context, chatty=True, connectionchatty=True,
2946 )
2947
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002948 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002949 client_context.wrap_socket(socket.socket(),
2950 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002951 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002952 # Expect either an SSL error about the server rejecting
2953 # the connection, or a low-level connection reset (which
2954 # sometimes happens on Windows)
2955 s.connect((HOST, server.port))
2956 except ssl.SSLError as e:
2957 if support.verbose:
2958 sys.stdout.write("\nSSLError is %r\n" % e)
2959 except OSError as e:
2960 if e.errno != errno.ECONNRESET:
2961 raise
2962 if support.verbose:
2963 sys.stdout.write("\nsocket.error is %r\n" % e)
2964 else:
2965 self.fail("Use of invalid cert should have failed!")
2966
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002967 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2968 def test_wrong_cert_tls13(self):
2969 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002970 # load client cert that is not signed by trusted CA
2971 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002972 server_context.verify_mode = ssl.CERT_REQUIRED
2973 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2974 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2975
2976 server = ThreadedEchoServer(
2977 context=server_context, chatty=True, connectionchatty=True,
2978 )
2979 with server, \
2980 client_context.wrap_socket(socket.socket(),
2981 server_hostname=hostname) as s:
2982 # TLS 1.3 perform client cert exchange after handshake
2983 s.connect((HOST, server.port))
2984 try:
2985 s.write(b'data')
2986 s.read(4)
2987 except ssl.SSLError as e:
2988 if support.verbose:
2989 sys.stdout.write("\nSSLError is %r\n" % e)
2990 except OSError as e:
2991 if e.errno != errno.ECONNRESET:
2992 raise
2993 if support.verbose:
2994 sys.stdout.write("\nsocket.error is %r\n" % e)
2995 else:
2996 self.fail("Use of invalid cert should have failed!")
2997
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002998 def test_rude_shutdown(self):
2999 """A brutal shutdown of an SSL server should raise an OSError
3000 in the client when attempting handshake.
3001 """
3002 listener_ready = threading.Event()
3003 listener_gone = threading.Event()
3004
3005 s = socket.socket()
3006 port = support.bind_port(s, HOST)
3007
3008 # `listener` runs in a thread. It sits in an accept() until
3009 # the main thread connects. Then it rudely closes the socket,
3010 # and sets Event `listener_gone` to let the main thread know
3011 # the socket is gone.
3012 def listener():
3013 s.listen()
3014 listener_ready.set()
3015 newsock, addr = s.accept()
3016 newsock.close()
3017 s.close()
3018 listener_gone.set()
3019
3020 def connector():
3021 listener_ready.wait()
3022 with socket.socket() as c:
3023 c.connect((HOST, port))
3024 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003025 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003026 ssl_sock = test_wrap_socket(c)
3027 except OSError:
3028 pass
3029 else:
3030 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003031
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003032 t = threading.Thread(target=listener)
3033 t.start()
3034 try:
3035 connector()
3036 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003037 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003038
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003039 def test_ssl_cert_verify_error(self):
3040 if support.verbose:
3041 sys.stdout.write("\n")
3042
3043 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3044 server_context.load_cert_chain(SIGNED_CERTFILE)
3045
3046 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3047
3048 server = ThreadedEchoServer(context=server_context, chatty=True)
3049 with server:
3050 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003051 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003052 try:
3053 s.connect((HOST, server.port))
3054 except ssl.SSLError as e:
3055 msg = 'unable to get local issuer certificate'
3056 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3057 self.assertEqual(e.verify_code, 20)
3058 self.assertEqual(e.verify_message, msg)
3059 self.assertIn(msg, repr(e))
3060 self.assertIn('certificate verify failed', repr(e))
3061
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003062 @skip_if_broken_ubuntu_ssl
3063 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3064 "OpenSSL is compiled without SSLv2 support")
3065 def test_protocol_sslv2(self):
3066 """Connecting to an SSLv2 server with various client options"""
3067 if support.verbose:
3068 sys.stdout.write("\n")
3069 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3070 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3071 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003072 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003073 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3074 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3075 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3076 # SSLv23 client with specific SSL options
3077 if no_sslv2_implies_sslv3_hello():
3078 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003079 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003080 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003081 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003082 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003083 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003084 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02003087 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003088 """Connecting to an SSLv23 server with various client options"""
3089 if support.verbose:
3090 sys.stdout.write("\n")
3091 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003092 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003093 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003094 except OSError as x:
3095 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3096 if support.verbose:
3097 sys.stdout.write(
3098 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3099 % str(x))
3100 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003101 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3102 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3103 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003104
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003105 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003106 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3107 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3108 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003109
3110 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003111 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3112 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3113 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003114
3115 # Server with specific SSL options
3116 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003117 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003118 server_options=ssl.OP_NO_SSLv3)
3119 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003120 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003121 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003122 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123 server_options=ssl.OP_NO_TLSv1)
3124
3125
3126 @skip_if_broken_ubuntu_ssl
3127 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3128 "OpenSSL is compiled without SSLv3 support")
3129 def test_protocol_sslv3(self):
3130 """Connecting to an SSLv3 server with various client options"""
3131 if support.verbose:
3132 sys.stdout.write("\n")
3133 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3134 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3135 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3136 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3137 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003138 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003139 client_options=ssl.OP_NO_SSLv3)
3140 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3141 if no_sslv2_implies_sslv3_hello():
3142 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003143 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003144 False, client_options=ssl.OP_NO_SSLv2)
3145
3146 @skip_if_broken_ubuntu_ssl
3147 def test_protocol_tlsv1(self):
3148 """Connecting to a TLSv1 server with various client options"""
3149 if support.verbose:
3150 sys.stdout.write("\n")
3151 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3152 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3153 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3154 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3155 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3156 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3157 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003158 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003159 client_options=ssl.OP_NO_TLSv1)
3160
3161 @skip_if_broken_ubuntu_ssl
3162 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3163 "TLS version 1.1 not supported.")
3164 def test_protocol_tlsv1_1(self):
3165 """Connecting to a TLSv1.1 server with various client options.
3166 Testing against older TLS versions."""
3167 if support.verbose:
3168 sys.stdout.write("\n")
3169 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3170 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3171 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3172 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3173 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003174 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003175 client_options=ssl.OP_NO_TLSv1_1)
3176
Christian Heimesa170fa12017-09-15 20:27:30 +02003177 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003178 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3179 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3180
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003181 @skip_if_broken_ubuntu_ssl
3182 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3183 "TLS version 1.2 not supported.")
3184 def test_protocol_tlsv1_2(self):
3185 """Connecting to a TLSv1.2 server with various client options.
3186 Testing against older TLS versions."""
3187 if support.verbose:
3188 sys.stdout.write("\n")
3189 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3190 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3191 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3192 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3193 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3194 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3195 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003196 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003197 client_options=ssl.OP_NO_TLSv1_2)
3198
Christian Heimesa170fa12017-09-15 20:27:30 +02003199 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003200 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3201 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3202 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3203 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3204
3205 def test_starttls(self):
3206 """Switching from clear text to encrypted and back again."""
3207 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3208
3209 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003210 starttls_server=True,
3211 chatty=True,
3212 connectionchatty=True)
3213 wrapped = False
3214 with server:
3215 s = socket.socket()
3216 s.setblocking(1)
3217 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003218 if support.verbose:
3219 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003220 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003221 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003222 sys.stdout.write(
3223 " client: sending %r...\n" % indata)
3224 if wrapped:
3225 conn.write(indata)
3226 outdata = conn.read()
3227 else:
3228 s.send(indata)
3229 outdata = s.recv(1024)
3230 msg = outdata.strip().lower()
3231 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3232 # STARTTLS ok, switch to secure mode
3233 if support.verbose:
3234 sys.stdout.write(
3235 " client: read %r from server, starting TLS...\n"
3236 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003237 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003238 wrapped = True
3239 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3240 # ENDTLS ok, switch back to clear text
3241 if support.verbose:
3242 sys.stdout.write(
3243 " client: read %r from server, ending TLS...\n"
3244 % msg)
3245 s = conn.unwrap()
3246 wrapped = False
3247 else:
3248 if support.verbose:
3249 sys.stdout.write(
3250 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003251 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003252 sys.stdout.write(" client: closing connection.\n")
3253 if wrapped:
3254 conn.write(b"over\n")
3255 else:
3256 s.send(b"over\n")
3257 if wrapped:
3258 conn.close()
3259 else:
3260 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003261
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003262 def test_socketserver(self):
3263 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003264 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003265 # try to connect
3266 if support.verbose:
3267 sys.stdout.write('\n')
3268 with open(CERTFILE, 'rb') as f:
3269 d1 = f.read()
3270 d2 = ''
3271 # now fetch the same data from the HTTPS server
3272 url = 'https://localhost:%d/%s' % (
3273 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003274 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003275 f = urllib.request.urlopen(url, context=context)
3276 try:
3277 dlen = f.info().get("content-length")
3278 if dlen and (int(dlen) > 0):
3279 d2 = f.read(int(dlen))
3280 if support.verbose:
3281 sys.stdout.write(
3282 " client: read %d bytes from remote server '%s'\n"
3283 % (len(d2), server))
3284 finally:
3285 f.close()
3286 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003287
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003288 def test_asyncore_server(self):
3289 """Check the example asyncore integration."""
3290 if support.verbose:
3291 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003292
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003293 indata = b"FOO\n"
3294 server = AsyncoreEchoServer(CERTFILE)
3295 with server:
3296 s = test_wrap_socket(socket.socket())
3297 s.connect(('127.0.0.1', server.port))
3298 if support.verbose:
3299 sys.stdout.write(
3300 " client: sending %r...\n" % indata)
3301 s.write(indata)
3302 outdata = s.read()
3303 if support.verbose:
3304 sys.stdout.write(" client: read %r\n" % outdata)
3305 if outdata != indata.lower():
3306 self.fail(
3307 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3308 % (outdata[:20], len(outdata),
3309 indata[:20].lower(), len(indata)))
3310 s.write(b"over\n")
3311 if support.verbose:
3312 sys.stdout.write(" client: closing connection.\n")
3313 s.close()
3314 if support.verbose:
3315 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003316
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003317 def test_recv_send(self):
3318 """Test recv(), send() and friends."""
3319 if support.verbose:
3320 sys.stdout.write("\n")
3321
3322 server = ThreadedEchoServer(CERTFILE,
3323 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003324 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003325 cacerts=CERTFILE,
3326 chatty=True,
3327 connectionchatty=False)
3328 with server:
3329 s = test_wrap_socket(socket.socket(),
3330 server_side=False,
3331 certfile=CERTFILE,
3332 ca_certs=CERTFILE,
3333 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003334 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003335 s.connect((HOST, server.port))
3336 # helper methods for standardising recv* method signatures
3337 def _recv_into():
3338 b = bytearray(b"\0"*100)
3339 count = s.recv_into(b)
3340 return b[:count]
3341
3342 def _recvfrom_into():
3343 b = bytearray(b"\0"*100)
3344 count, addr = s.recvfrom_into(b)
3345 return b[:count]
3346
3347 # (name, method, expect success?, *args, return value func)
3348 send_methods = [
3349 ('send', s.send, True, [], len),
3350 ('sendto', s.sendto, False, ["some.address"], len),
3351 ('sendall', s.sendall, True, [], lambda x: None),
3352 ]
3353 # (name, method, whether to expect success, *args)
3354 recv_methods = [
3355 ('recv', s.recv, True, []),
3356 ('recvfrom', s.recvfrom, False, ["some.address"]),
3357 ('recv_into', _recv_into, True, []),
3358 ('recvfrom_into', _recvfrom_into, False, []),
3359 ]
3360 data_prefix = "PREFIX_"
3361
3362 for (meth_name, send_meth, expect_success, args,
3363 ret_val_meth) in send_methods:
3364 indata = (data_prefix + meth_name).encode('ascii')
3365 try:
3366 ret = send_meth(indata, *args)
3367 msg = "sending with {}".format(meth_name)
3368 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3369 outdata = s.read()
3370 if outdata != indata.lower():
3371 self.fail(
3372 "While sending with <<{name:s}>> bad data "
3373 "<<{outdata:r}>> ({nout:d}) received; "
3374 "expected <<{indata:r}>> ({nin:d})\n".format(
3375 name=meth_name, outdata=outdata[:20],
3376 nout=len(outdata),
3377 indata=indata[:20], nin=len(indata)
3378 )
3379 )
3380 except ValueError as e:
3381 if expect_success:
3382 self.fail(
3383 "Failed to send with method <<{name:s}>>; "
3384 "expected to succeed.\n".format(name=meth_name)
3385 )
3386 if not str(e).startswith(meth_name):
3387 self.fail(
3388 "Method <<{name:s}>> failed with unexpected "
3389 "exception message: {exp:s}\n".format(
3390 name=meth_name, exp=e
3391 )
3392 )
3393
3394 for meth_name, recv_meth, expect_success, args in recv_methods:
3395 indata = (data_prefix + meth_name).encode('ascii')
3396 try:
3397 s.send(indata)
3398 outdata = recv_meth(*args)
3399 if outdata != indata.lower():
3400 self.fail(
3401 "While receiving with <<{name:s}>> bad data "
3402 "<<{outdata:r}>> ({nout:d}) received; "
3403 "expected <<{indata:r}>> ({nin:d})\n".format(
3404 name=meth_name, outdata=outdata[:20],
3405 nout=len(outdata),
3406 indata=indata[:20], nin=len(indata)
3407 )
3408 )
3409 except ValueError as e:
3410 if expect_success:
3411 self.fail(
3412 "Failed to receive with method <<{name:s}>>; "
3413 "expected to succeed.\n".format(name=meth_name)
3414 )
3415 if not str(e).startswith(meth_name):
3416 self.fail(
3417 "Method <<{name:s}>> failed with unexpected "
3418 "exception message: {exp:s}\n".format(
3419 name=meth_name, exp=e
3420 )
3421 )
3422 # consume data
3423 s.read()
3424
3425 # read(-1, buffer) is supported, even though read(-1) is not
3426 data = b"data"
3427 s.send(data)
3428 buffer = bytearray(len(data))
3429 self.assertEqual(s.read(-1, buffer), len(data))
3430 self.assertEqual(buffer, data)
3431
Christian Heimes888bbdc2017-09-07 14:18:21 -07003432 # sendall accepts bytes-like objects
3433 if ctypes is not None:
3434 ubyte = ctypes.c_ubyte * len(data)
3435 byteslike = ubyte.from_buffer_copy(data)
3436 s.sendall(byteslike)
3437 self.assertEqual(s.read(), data)
3438
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003439 # Make sure sendmsg et al are disallowed to avoid
3440 # inadvertent disclosure of data and/or corruption
3441 # of the encrypted data stream
Miss Islington (bot)6485aa62018-12-06 12:52:43 -08003442 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003443 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3444 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3445 self.assertRaises(NotImplementedError,
Miss Islington (bot)6485aa62018-12-06 12:52:43 -08003446 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003447 s.write(b"over\n")
3448
3449 self.assertRaises(ValueError, s.recv, -1)
3450 self.assertRaises(ValueError, s.read, -1)
3451
3452 s.close()
3453
3454 def test_recv_zero(self):
3455 server = ThreadedEchoServer(CERTFILE)
3456 server.__enter__()
3457 self.addCleanup(server.__exit__, None, None)
3458 s = socket.create_connection((HOST, server.port))
3459 self.addCleanup(s.close)
3460 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3461 self.addCleanup(s.close)
3462
3463 # recv/read(0) should return no data
3464 s.send(b"data")
3465 self.assertEqual(s.recv(0), b"")
3466 self.assertEqual(s.read(0), b"")
3467 self.assertEqual(s.read(), b"data")
3468
3469 # Should not block if the other end sends no data
3470 s.setblocking(False)
3471 self.assertEqual(s.recv(0), b"")
3472 self.assertEqual(s.recv_into(bytearray()), 0)
3473
3474 def test_nonblocking_send(self):
3475 server = ThreadedEchoServer(CERTFILE,
3476 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003477 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003478 cacerts=CERTFILE,
3479 chatty=True,
3480 connectionchatty=False)
3481 with server:
3482 s = test_wrap_socket(socket.socket(),
3483 server_side=False,
3484 certfile=CERTFILE,
3485 ca_certs=CERTFILE,
3486 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003487 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003488 s.connect((HOST, server.port))
3489 s.setblocking(False)
3490
3491 # If we keep sending data, at some point the buffers
3492 # will be full and the call will block
3493 buf = bytearray(8192)
3494 def fill_buffer():
3495 while True:
3496 s.send(buf)
3497 self.assertRaises((ssl.SSLWantWriteError,
3498 ssl.SSLWantReadError), fill_buffer)
3499
3500 # Now read all the output and discard it
3501 s.setblocking(True)
3502 s.close()
3503
3504 def test_handshake_timeout(self):
3505 # Issue #5103: SSL handshake must respect the socket timeout
3506 server = socket.socket(socket.AF_INET)
3507 host = "127.0.0.1"
3508 port = support.bind_port(server)
3509 started = threading.Event()
3510 finish = False
3511
3512 def serve():
3513 server.listen()
3514 started.set()
3515 conns = []
3516 while not finish:
3517 r, w, e = select.select([server], [], [], 0.1)
3518 if server in r:
3519 # Let the socket hang around rather than having
3520 # it closed by garbage collection.
3521 conns.append(server.accept()[0])
3522 for sock in conns:
3523 sock.close()
3524
3525 t = threading.Thread(target=serve)
3526 t.start()
3527 started.wait()
3528
3529 try:
3530 try:
3531 c = socket.socket(socket.AF_INET)
3532 c.settimeout(0.2)
3533 c.connect((host, port))
3534 # Will attempt handshake and time out
3535 self.assertRaisesRegex(socket.timeout, "timed out",
3536 test_wrap_socket, c)
3537 finally:
3538 c.close()
3539 try:
3540 c = socket.socket(socket.AF_INET)
3541 c = test_wrap_socket(c)
3542 c.settimeout(0.2)
3543 # Will attempt handshake and time out
3544 self.assertRaisesRegex(socket.timeout, "timed out",
3545 c.connect, (host, port))
3546 finally:
3547 c.close()
3548 finally:
3549 finish = True
3550 t.join()
3551 server.close()
3552
3553 def test_server_accept(self):
3554 # Issue #16357: accept() on a SSLSocket created through
3555 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003556 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003557 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003558 context.load_verify_locations(SIGNING_CA)
3559 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003560 server = socket.socket(socket.AF_INET)
3561 host = "127.0.0.1"
3562 port = support.bind_port(server)
3563 server = context.wrap_socket(server, server_side=True)
3564 self.assertTrue(server.server_side)
3565
3566 evt = threading.Event()
3567 remote = None
3568 peer = None
3569 def serve():
3570 nonlocal remote, peer
3571 server.listen()
3572 # Block on the accept and wait on the connection to close.
3573 evt.set()
3574 remote, peer = server.accept()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003575 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003576
3577 t = threading.Thread(target=serve)
3578 t.start()
3579 # Client wait until server setup and perform a connect.
3580 evt.wait()
3581 client = context.wrap_socket(socket.socket())
3582 client.connect((host, port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003583 client.send(b'data')
3584 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003585 client_addr = client.getsockname()
3586 client.close()
3587 t.join()
3588 remote.close()
3589 server.close()
3590 # Sanity checks.
3591 self.assertIsInstance(remote, ssl.SSLSocket)
3592 self.assertEqual(peer, client_addr)
3593
3594 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003595 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003596 with context.wrap_socket(socket.socket()) as sock:
3597 with self.assertRaises(OSError) as cm:
3598 sock.getpeercert()
3599 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3600
3601 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003602 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003603 with context.wrap_socket(socket.socket()) as sock:
3604 with self.assertRaises(OSError) as cm:
3605 sock.do_handshake()
3606 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3607
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003608 def test_no_shared_ciphers(self):
3609 client_context, server_context, hostname = testing_context()
3610 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3611 client_context.options |= ssl.OP_NO_TLSv1_3
3612 # Force different suites on client and master
3613 client_context.set_ciphers("AES128")
3614 server_context.set_ciphers("AES256")
3615 with ThreadedEchoServer(context=server_context) as server:
3616 with client_context.wrap_socket(socket.socket(),
3617 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003618 with self.assertRaises(OSError):
3619 s.connect((HOST, server.port))
3620 self.assertIn("no shared cipher", server.conn_errors[0])
3621
3622 def test_version_basic(self):
3623 """
3624 Basic tests for SSLSocket.version().
3625 More tests are done in the test_protocol_*() methods.
3626 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003627 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3628 context.check_hostname = False
3629 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003630 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003631 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003632 chatty=False) as server:
3633 with context.wrap_socket(socket.socket()) as s:
3634 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003635 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003636 s.connect((HOST, server.port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003637 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003638 self.assertEqual(s.version(), 'TLSv1.3')
3639 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003640 self.assertEqual(s.version(), 'TLSv1.2')
3641 else: # 0.9.8 to 1.0.1
3642 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003643 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003644 self.assertIs(s.version(), None)
3645
Christian Heimescb5b68a2017-09-07 18:07:00 -07003646 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3647 "test requires TLSv1.3 enabled OpenSSL")
3648 def test_tls1_3(self):
3649 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3650 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003651 context.options |= (
3652 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3653 )
3654 with ThreadedEchoServer(context=context) as server:
3655 with context.wrap_socket(socket.socket()) as s:
3656 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003657 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003658 'TLS_AES_256_GCM_SHA384',
3659 'TLS_CHACHA20_POLY1305_SHA256',
3660 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003661 })
3662 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003663
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003664 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3665 "required OpenSSL 1.1.0g")
3666 def test_min_max_version(self):
3667 client_context, server_context, hostname = testing_context()
3668 # client TLSv1.0 to 1.2
3669 client_context.minimum_version = ssl.TLSVersion.TLSv1
3670 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3671 # server only TLSv1.2
3672 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3673 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3674
3675 with ThreadedEchoServer(context=server_context) as server:
3676 with client_context.wrap_socket(socket.socket(),
3677 server_hostname=hostname) as s:
3678 s.connect((HOST, server.port))
3679 self.assertEqual(s.version(), 'TLSv1.2')
3680
3681 # client 1.0 to 1.2, server 1.0 to 1.1
3682 server_context.minimum_version = ssl.TLSVersion.TLSv1
3683 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3684
3685 with ThreadedEchoServer(context=server_context) as server:
3686 with client_context.wrap_socket(socket.socket(),
3687 server_hostname=hostname) as s:
3688 s.connect((HOST, server.port))
3689 self.assertEqual(s.version(), 'TLSv1.1')
3690
3691 # client 1.0, server 1.2 (mismatch)
3692 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3693 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3694 client_context.minimum_version = ssl.TLSVersion.TLSv1
3695 client_context.maximum_version = ssl.TLSVersion.TLSv1
3696 with ThreadedEchoServer(context=server_context) as server:
3697 with client_context.wrap_socket(socket.socket(),
3698 server_hostname=hostname) as s:
3699 with self.assertRaises(ssl.SSLError) as e:
3700 s.connect((HOST, server.port))
3701 self.assertIn("alert", str(e.exception))
3702
3703
3704 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3705 "required OpenSSL 1.1.0g")
3706 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3707 def test_min_max_version_sslv3(self):
3708 client_context, server_context, hostname = testing_context()
3709 server_context.minimum_version = ssl.TLSVersion.SSLv3
3710 client_context.minimum_version = ssl.TLSVersion.SSLv3
3711 client_context.maximum_version = ssl.TLSVersion.SSLv3
3712 with ThreadedEchoServer(context=server_context) as server:
3713 with client_context.wrap_socket(socket.socket(),
3714 server_hostname=hostname) as s:
3715 s.connect((HOST, server.port))
3716 self.assertEqual(s.version(), 'SSLv3')
3717
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003718 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3719 def test_default_ecdh_curve(self):
3720 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3721 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003722 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003723 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003724 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3725 # cipher name.
3726 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003727 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3728 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3729 # our default cipher list should prefer ECDH-based ciphers
3730 # automatically.
3731 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3732 context.set_ciphers("ECCdraft:ECDH")
3733 with ThreadedEchoServer(context=context) as server:
3734 with context.wrap_socket(socket.socket()) as s:
3735 s.connect((HOST, server.port))
3736 self.assertIn("ECDH", s.cipher()[0])
3737
3738 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3739 "'tls-unique' channel binding not available")
3740 def test_tls_unique_channel_binding(self):
3741 """Test tls-unique channel binding."""
3742 if support.verbose:
3743 sys.stdout.write("\n")
3744
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003745 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003746
3747 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003748 chatty=True,
3749 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003750
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003751 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003752 with client_context.wrap_socket(
3753 socket.socket(),
3754 server_hostname=hostname) as s:
3755 s.connect((HOST, server.port))
3756 # get the data
3757 cb_data = s.get_channel_binding("tls-unique")
3758 if support.verbose:
3759 sys.stdout.write(
3760 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003762 # check if it is sane
3763 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003764 if s.version() == 'TLSv1.3':
3765 self.assertEqual(len(cb_data), 48)
3766 else:
3767 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003768
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003769 # and compare with the peers version
3770 s.write(b"CB tls-unique\n")
3771 peer_data_repr = s.read().strip()
3772 self.assertEqual(peer_data_repr,
3773 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003774
3775 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003776 with client_context.wrap_socket(
3777 socket.socket(),
3778 server_hostname=hostname) as s:
3779 s.connect((HOST, server.port))
3780 new_cb_data = s.get_channel_binding("tls-unique")
3781 if support.verbose:
3782 sys.stdout.write(
3783 "got another channel binding data: {0!r}\n".format(
3784 new_cb_data)
3785 )
3786 # is it really unique
3787 self.assertNotEqual(cb_data, new_cb_data)
3788 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003789 if s.version() == 'TLSv1.3':
3790 self.assertEqual(len(cb_data), 48)
3791 else:
3792 self.assertEqual(len(cb_data), 12) # True for TLSv1
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003793 s.write(b"CB tls-unique\n")
3794 peer_data_repr = s.read().strip()
3795 self.assertEqual(peer_data_repr,
3796 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003797
3798 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003799 client_context, server_context, hostname = testing_context()
3800 stats = server_params_test(client_context, server_context,
3801 chatty=True, connectionchatty=True,
3802 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003803 if support.verbose:
3804 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3805 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3806
3807 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3808 "ssl.OP_NO_COMPRESSION needed for this test")
3809 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003810 client_context, server_context, hostname = testing_context()
3811 client_context.options |= ssl.OP_NO_COMPRESSION
3812 server_context.options |= ssl.OP_NO_COMPRESSION
3813 stats = server_params_test(client_context, server_context,
3814 chatty=True, connectionchatty=True,
3815 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003816 self.assertIs(stats['compression'], None)
3817
3818 def test_dh_params(self):
3819 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003820 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003821 # test scenario needs TLS <= 1.2
3822 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003823 server_context.load_dh_params(DHFILE)
3824 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003825 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003826 stats = server_params_test(client_context, server_context,
3827 chatty=True, connectionchatty=True,
3828 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003829 cipher = stats["cipher"][0]
3830 parts = cipher.split("-")
3831 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3832 self.fail("Non-DH cipher: " + cipher[0])
3833
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003834 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003835 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003836 def test_ecdh_curve(self):
3837 # server secp384r1, client auto
3838 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003839
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003840 server_context.set_ecdh_curve("secp384r1")
3841 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3842 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3843 stats = server_params_test(client_context, server_context,
3844 chatty=True, connectionchatty=True,
3845 sni_name=hostname)
3846
3847 # server auto, client secp384r1
3848 client_context, server_context, hostname = testing_context()
3849 client_context.set_ecdh_curve("secp384r1")
3850 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3851 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3852 stats = server_params_test(client_context, server_context,
3853 chatty=True, connectionchatty=True,
3854 sni_name=hostname)
3855
3856 # server / client curve mismatch
3857 client_context, server_context, hostname = testing_context()
3858 client_context.set_ecdh_curve("prime256v1")
3859 server_context.set_ecdh_curve("secp384r1")
3860 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3861 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3862 try:
3863 stats = server_params_test(client_context, server_context,
3864 chatty=True, connectionchatty=True,
3865 sni_name=hostname)
3866 except ssl.SSLError:
3867 pass
3868 else:
3869 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003870 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003871 self.fail("mismatch curve did not fail")
3872
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003873 def test_selected_alpn_protocol(self):
3874 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003875 client_context, server_context, hostname = testing_context()
3876 stats = server_params_test(client_context, server_context,
3877 chatty=True, connectionchatty=True,
3878 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003879 self.assertIs(stats['client_alpn_protocol'], None)
3880
3881 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3882 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3883 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003884 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003885 server_context.set_alpn_protocols(['foo', 'bar'])
3886 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003887 chatty=True, connectionchatty=True,
3888 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003889 self.assertIs(stats['client_alpn_protocol'], None)
3890
3891 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3892 def test_alpn_protocols(self):
3893 server_protocols = ['foo', 'bar', 'milkshake']
3894 protocol_tests = [
3895 (['foo', 'bar'], 'foo'),
3896 (['bar', 'foo'], 'foo'),
3897 (['milkshake'], 'milkshake'),
3898 (['http/3.0', 'http/4.0'], None)
3899 ]
3900 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003901 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003902 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003903 client_context.set_alpn_protocols(client_protocols)
3904
3905 try:
3906 stats = server_params_test(client_context,
3907 server_context,
3908 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003909 connectionchatty=True,
3910 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003911 except ssl.SSLError as e:
3912 stats = e
3913
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003914 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003915 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3916 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3917 self.assertIsInstance(stats, ssl.SSLError)
3918 else:
3919 msg = "failed trying %s (s) and %s (c).\n" \
3920 "was expecting %s, but got %%s from the %%s" \
3921 % (str(server_protocols), str(client_protocols),
3922 str(expected))
3923 client_result = stats['client_alpn_protocol']
3924 self.assertEqual(client_result, expected,
3925 msg % (client_result, "client"))
3926 server_result = stats['server_alpn_protocols'][-1] \
3927 if len(stats['server_alpn_protocols']) else 'nothing'
3928 self.assertEqual(server_result, expected,
3929 msg % (server_result, "server"))
3930
3931 def test_selected_npn_protocol(self):
3932 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003933 client_context, server_context, hostname = testing_context()
3934 stats = server_params_test(client_context, server_context,
3935 chatty=True, connectionchatty=True,
3936 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003937 self.assertIs(stats['client_npn_protocol'], None)
3938
3939 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3940 def test_npn_protocols(self):
3941 server_protocols = ['http/1.1', 'spdy/2']
3942 protocol_tests = [
3943 (['http/1.1', 'spdy/2'], 'http/1.1'),
3944 (['spdy/2', 'http/1.1'], 'http/1.1'),
3945 (['spdy/2', 'test'], 'spdy/2'),
3946 (['abc', 'def'], 'abc')
3947 ]
3948 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003949 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003950 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003951 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003952 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003953 chatty=True, connectionchatty=True,
3954 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003955 msg = "failed trying %s (s) and %s (c).\n" \
3956 "was expecting %s, but got %%s from the %%s" \
3957 % (str(server_protocols), str(client_protocols),
3958 str(expected))
3959 client_result = stats['client_npn_protocol']
3960 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3961 server_result = stats['server_npn_protocols'][-1] \
3962 if len(stats['server_npn_protocols']) else 'nothing'
3963 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3964
3965 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003966 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003967 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003968 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003969 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003970 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003971 client_context.load_verify_locations(SIGNING_CA)
3972 return server_context, other_context, client_context
3973
3974 def check_common_name(self, stats, name):
3975 cert = stats['peercert']
3976 self.assertIn((('commonName', name),), cert['subject'])
3977
3978 @needs_sni
3979 def test_sni_callback(self):
3980 calls = []
3981 server_context, other_context, client_context = self.sni_contexts()
3982
Christian Heimesa170fa12017-09-15 20:27:30 +02003983 client_context.check_hostname = False
3984
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003985 def servername_cb(ssl_sock, server_name, initial_context):
3986 calls.append((server_name, initial_context))
3987 if server_name is not None:
3988 ssl_sock.context = other_context
3989 server_context.set_servername_callback(servername_cb)
3990
3991 stats = server_params_test(client_context, server_context,
3992 chatty=True,
3993 sni_name='supermessage')
3994 # The hostname was fetched properly, and the certificate was
3995 # changed for the connection.
3996 self.assertEqual(calls, [("supermessage", server_context)])
3997 # CERTFILE4 was selected
3998 self.check_common_name(stats, 'fakehostname')
3999
4000 calls = []
4001 # The callback is called with server_name=None
4002 stats = server_params_test(client_context, server_context,
4003 chatty=True,
4004 sni_name=None)
4005 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004006 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004007
4008 # Check disabling the callback
4009 calls = []
4010 server_context.set_servername_callback(None)
4011
4012 stats = server_params_test(client_context, server_context,
4013 chatty=True,
4014 sni_name='notfunny')
4015 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004016 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017 self.assertEqual(calls, [])
4018
4019 @needs_sni
4020 def test_sni_callback_alert(self):
4021 # Returning a TLS alert is reflected to the connecting client
4022 server_context, other_context, client_context = self.sni_contexts()
4023
4024 def cb_returning_alert(ssl_sock, server_name, initial_context):
4025 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4026 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004027 with self.assertRaises(ssl.SSLError) as cm:
4028 stats = server_params_test(client_context, server_context,
4029 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004030 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004031 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004033 @needs_sni
4034 def test_sni_callback_raising(self):
4035 # Raising fails the connection with a TLS handshake failure alert.
4036 server_context, other_context, client_context = self.sni_contexts()
4037
4038 def cb_raising(ssl_sock, server_name, initial_context):
4039 1/0
4040 server_context.set_servername_callback(cb_raising)
4041
4042 with self.assertRaises(ssl.SSLError) as cm, \
4043 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004044 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004045 chatty=False,
4046 sni_name='supermessage')
4047 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4048 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004049
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004050 @needs_sni
4051 def test_sni_callback_wrong_return_type(self):
4052 # Returning the wrong return type terminates the TLS connection
4053 # with an internal error alert.
4054 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004055
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004056 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4057 return "foo"
4058 server_context.set_servername_callback(cb_wrong_return_type)
4059
4060 with self.assertRaises(ssl.SSLError) as cm, \
4061 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004062 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 chatty=False,
4064 sni_name='supermessage')
4065 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4066 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004068 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004069 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004070 client_context.set_ciphers("AES128:AES256")
4071 server_context.set_ciphers("AES256")
4072 expected_algs = [
4073 "AES256", "AES-256",
4074 # TLS 1.3 ciphers are always enabled
4075 "TLS_CHACHA20", "TLS_AES",
4076 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004077
Christian Heimesa170fa12017-09-15 20:27:30 +02004078 stats = server_params_test(client_context, server_context,
4079 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004080 ciphers = stats['server_shared_ciphers'][0]
4081 self.assertGreater(len(ciphers), 0)
4082 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004083 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004084 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004086 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004087 client_context, server_context, hostname = testing_context()
4088 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004089
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004090 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004091 s = client_context.wrap_socket(socket.socket(),
4092 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 s.connect((HOST, server.port))
4094 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 self.assertRaises(ValueError, s.read, 1024)
4097 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004098
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004099 def test_sendfile(self):
4100 TEST_DATA = b"x" * 512
4101 with open(support.TESTFN, 'wb') as f:
4102 f.write(TEST_DATA)
4103 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004104 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004105 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004106 context.load_verify_locations(SIGNING_CA)
4107 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004108 server = ThreadedEchoServer(context=context, chatty=False)
4109 with server:
4110 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004111 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004112 with open(support.TESTFN, 'rb') as file:
4113 s.sendfile(file)
4114 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004115
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004116 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004117 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004118 # TODO: sessions aren't compatible with TLSv1.3 yet
4119 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004120
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004121 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004122 stats = server_params_test(client_context, server_context,
4123 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004124 session = stats['session']
4125 self.assertTrue(session.id)
4126 self.assertGreater(session.time, 0)
4127 self.assertGreater(session.timeout, 0)
4128 self.assertTrue(session.has_ticket)
4129 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4130 self.assertGreater(session.ticket_lifetime_hint, 0)
4131 self.assertFalse(stats['session_reused'])
4132 sess_stat = server_context.session_stats()
4133 self.assertEqual(sess_stat['accept'], 1)
4134 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004135
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004136 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004137 stats = server_params_test(client_context, server_context,
4138 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004139 sess_stat = server_context.session_stats()
4140 self.assertEqual(sess_stat['accept'], 2)
4141 self.assertEqual(sess_stat['hits'], 1)
4142 self.assertTrue(stats['session_reused'])
4143 session2 = stats['session']
4144 self.assertEqual(session2.id, session.id)
4145 self.assertEqual(session2, session)
4146 self.assertIsNot(session2, session)
4147 self.assertGreaterEqual(session2.time, session.time)
4148 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004149
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004150 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004151 stats = server_params_test(client_context, server_context,
4152 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004153 self.assertFalse(stats['session_reused'])
4154 session3 = stats['session']
4155 self.assertNotEqual(session3.id, session.id)
4156 self.assertNotEqual(session3, session)
4157 sess_stat = server_context.session_stats()
4158 self.assertEqual(sess_stat['accept'], 3)
4159 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004160
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004161 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004162 stats = server_params_test(client_context, server_context,
4163 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004164 self.assertTrue(stats['session_reused'])
4165 session4 = stats['session']
4166 self.assertEqual(session4.id, session.id)
4167 self.assertEqual(session4, session)
4168 self.assertGreaterEqual(session4.time, session.time)
4169 self.assertGreaterEqual(session4.timeout, session.timeout)
4170 sess_stat = server_context.session_stats()
4171 self.assertEqual(sess_stat['accept'], 4)
4172 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004173
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004174 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004175 client_context, server_context, hostname = testing_context()
4176 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004177
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004178 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004179 client_context.options |= ssl.OP_NO_TLSv1_3
4180 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004181
Christian Heimesa170fa12017-09-15 20:27:30 +02004182 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004183 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004184 with client_context.wrap_socket(socket.socket(),
4185 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004186 # session is None before handshake
4187 self.assertEqual(s.session, None)
4188 self.assertEqual(s.session_reused, None)
4189 s.connect((HOST, server.port))
4190 session = s.session
4191 self.assertTrue(session)
4192 with self.assertRaises(TypeError) as e:
4193 s.session = object
Miss Islington (bot)42198572018-06-11 17:58:06 -07004194 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004195
Christian Heimesa170fa12017-09-15 20:27:30 +02004196 with client_context.wrap_socket(socket.socket(),
4197 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004198 s.connect((HOST, server.port))
4199 # cannot set session after handshake
4200 with self.assertRaises(ValueError) as e:
4201 s.session = session
4202 self.assertEqual(str(e.exception),
4203 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004204
Christian Heimesa170fa12017-09-15 20:27:30 +02004205 with client_context.wrap_socket(socket.socket(),
4206 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004207 # can set session before handshake and before the
4208 # connection was established
4209 s.session = session
4210 s.connect((HOST, server.port))
4211 self.assertEqual(s.session.id, session.id)
4212 self.assertEqual(s.session, session)
4213 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004214
Christian Heimesa170fa12017-09-15 20:27:30 +02004215 with client_context2.wrap_socket(socket.socket(),
4216 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004217 # cannot re-use session with a different SSLContext
4218 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004219 s.session = session
4220 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004221 self.assertEqual(str(e.exception),
4222 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004223
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004224
Christian Heimes2756ef32018-09-23 09:22:52 +02004225@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4226class TestPostHandshakeAuth(unittest.TestCase):
4227 def test_pha_setter(self):
4228 protocols = [
4229 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4230 ]
4231 for protocol in protocols:
4232 ctx = ssl.SSLContext(protocol)
4233 self.assertEqual(ctx.post_handshake_auth, False)
4234
4235 ctx.post_handshake_auth = True
4236 self.assertEqual(ctx.post_handshake_auth, True)
4237
4238 ctx.verify_mode = ssl.CERT_REQUIRED
4239 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4240 self.assertEqual(ctx.post_handshake_auth, True)
4241
4242 ctx.post_handshake_auth = False
4243 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4244 self.assertEqual(ctx.post_handshake_auth, False)
4245
4246 ctx.verify_mode = ssl.CERT_OPTIONAL
4247 ctx.post_handshake_auth = True
4248 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4249 self.assertEqual(ctx.post_handshake_auth, True)
4250
4251 def test_pha_required(self):
4252 client_context, server_context, hostname = testing_context()
4253 server_context.post_handshake_auth = True
4254 server_context.verify_mode = ssl.CERT_REQUIRED
4255 client_context.post_handshake_auth = True
4256 client_context.load_cert_chain(SIGNED_CERTFILE)
4257
4258 server = ThreadedEchoServer(context=server_context, chatty=False)
4259 with server:
4260 with client_context.wrap_socket(socket.socket(),
4261 server_hostname=hostname) as s:
4262 s.connect((HOST, server.port))
4263 s.write(b'HASCERT')
4264 self.assertEqual(s.recv(1024), b'FALSE\n')
4265 s.write(b'PHA')
4266 self.assertEqual(s.recv(1024), b'OK\n')
4267 s.write(b'HASCERT')
4268 self.assertEqual(s.recv(1024), b'TRUE\n')
4269 # PHA method just returns true when cert is already available
4270 s.write(b'PHA')
4271 self.assertEqual(s.recv(1024), b'OK\n')
4272 s.write(b'GETCERT')
4273 cert_text = s.recv(4096).decode('us-ascii')
4274 self.assertIn('Python Software Foundation CA', cert_text)
4275
4276 def test_pha_required_nocert(self):
4277 client_context, server_context, hostname = testing_context()
4278 server_context.post_handshake_auth = True
4279 server_context.verify_mode = ssl.CERT_REQUIRED
4280 client_context.post_handshake_auth = True
4281
4282 server = ThreadedEchoServer(context=server_context, chatty=False)
4283 with server:
4284 with client_context.wrap_socket(socket.socket(),
4285 server_hostname=hostname) as s:
4286 s.connect((HOST, server.port))
4287 s.write(b'PHA')
4288 # receive CertificateRequest
4289 self.assertEqual(s.recv(1024), b'OK\n')
4290 # send empty Certificate + Finish
4291 s.write(b'HASCERT')
4292 # receive alert
4293 with self.assertRaisesRegex(
4294 ssl.SSLError,
4295 'tlsv13 alert certificate required'):
4296 s.recv(1024)
4297
4298 def test_pha_optional(self):
4299 if support.verbose:
4300 sys.stdout.write("\n")
4301
4302 client_context, server_context, hostname = testing_context()
4303 server_context.post_handshake_auth = True
4304 server_context.verify_mode = ssl.CERT_REQUIRED
4305 client_context.post_handshake_auth = True
4306 client_context.load_cert_chain(SIGNED_CERTFILE)
4307
4308 # check CERT_OPTIONAL
4309 server_context.verify_mode = ssl.CERT_OPTIONAL
4310 server = ThreadedEchoServer(context=server_context, chatty=False)
4311 with server:
4312 with client_context.wrap_socket(socket.socket(),
4313 server_hostname=hostname) as s:
4314 s.connect((HOST, server.port))
4315 s.write(b'HASCERT')
4316 self.assertEqual(s.recv(1024), b'FALSE\n')
4317 s.write(b'PHA')
4318 self.assertEqual(s.recv(1024), b'OK\n')
4319 s.write(b'HASCERT')
4320 self.assertEqual(s.recv(1024), b'TRUE\n')
4321
4322 def test_pha_optional_nocert(self):
4323 if support.verbose:
4324 sys.stdout.write("\n")
4325
4326 client_context, server_context, hostname = testing_context()
4327 server_context.post_handshake_auth = True
4328 server_context.verify_mode = ssl.CERT_OPTIONAL
4329 client_context.post_handshake_auth = True
4330
4331 server = ThreadedEchoServer(context=server_context, chatty=False)
4332 with server:
4333 with client_context.wrap_socket(socket.socket(),
4334 server_hostname=hostname) as s:
4335 s.connect((HOST, server.port))
4336 s.write(b'HASCERT')
4337 self.assertEqual(s.recv(1024), b'FALSE\n')
4338 s.write(b'PHA')
4339 self.assertEqual(s.recv(1024), b'OK\n')
4340 # optional doens't fail when client does not have a cert
4341 s.write(b'HASCERT')
4342 self.assertEqual(s.recv(1024), b'FALSE\n')
4343
4344 def test_pha_no_pha_client(self):
4345 client_context, server_context, hostname = testing_context()
4346 server_context.post_handshake_auth = True
4347 server_context.verify_mode = ssl.CERT_REQUIRED
4348 client_context.load_cert_chain(SIGNED_CERTFILE)
4349
4350 server = ThreadedEchoServer(context=server_context, chatty=False)
4351 with server:
4352 with client_context.wrap_socket(socket.socket(),
4353 server_hostname=hostname) as s:
4354 s.connect((HOST, server.port))
4355 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4356 s.verify_client_post_handshake()
4357 s.write(b'PHA')
4358 self.assertIn(b'extension not received', s.recv(1024))
4359
4360 def test_pha_no_pha_server(self):
4361 # server doesn't have PHA enabled, cert is requested in handshake
4362 client_context, server_context, hostname = testing_context()
4363 server_context.verify_mode = ssl.CERT_REQUIRED
4364 client_context.post_handshake_auth = True
4365 client_context.load_cert_chain(SIGNED_CERTFILE)
4366
4367 server = ThreadedEchoServer(context=server_context, chatty=False)
4368 with server:
4369 with client_context.wrap_socket(socket.socket(),
4370 server_hostname=hostname) as s:
4371 s.connect((HOST, server.port))
4372 s.write(b'HASCERT')
4373 self.assertEqual(s.recv(1024), b'TRUE\n')
4374 # PHA doesn't fail if there is already a cert
4375 s.write(b'PHA')
4376 self.assertEqual(s.recv(1024), b'OK\n')
4377 s.write(b'HASCERT')
4378 self.assertEqual(s.recv(1024), b'TRUE\n')
4379
4380 def test_pha_not_tls13(self):
4381 # TLS 1.2
4382 client_context, server_context, hostname = testing_context()
4383 server_context.verify_mode = ssl.CERT_REQUIRED
4384 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4385 client_context.post_handshake_auth = True
4386 client_context.load_cert_chain(SIGNED_CERTFILE)
4387
4388 server = ThreadedEchoServer(context=server_context, chatty=False)
4389 with server:
4390 with client_context.wrap_socket(socket.socket(),
4391 server_hostname=hostname) as s:
4392 s.connect((HOST, server.port))
4393 # PHA fails for TLS != 1.3
4394 s.write(b'PHA')
4395 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4396
4397
Thomas Woutersed03b412007-08-28 21:37:11 +00004398def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004399 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004400 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004401 plats = {
4402 'Linux': platform.linux_distribution,
4403 'Mac': platform.mac_ver,
4404 'Windows': platform.win32_ver,
4405 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004406 with warnings.catch_warnings():
4407 warnings.filterwarnings(
4408 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004409 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004410 'functions are deprecated .*',
Xtreakf5770f32018-07-05 22:29:46 +05304411 DeprecationWarning,
Berker Peksag9e7990a2015-05-16 23:21:26 +03004412 )
4413 for name, func in plats.items():
4414 plat = func()
4415 if plat and plat[0]:
4416 plat = '%s %r' % (name, plat)
4417 break
4418 else:
4419 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004420 print("test_ssl: testing with %r %r" %
4421 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4422 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004423 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004424 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4425 try:
4426 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4427 except AttributeError:
4428 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004429
Antoine Pitrou152efa22010-05-16 18:19:27 +00004430 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004431 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004432 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004433 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004434 BADCERT, BADKEY, EMPTYCERT]:
4435 if not os.path.exists(filename):
4436 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004437
Martin Panter3840b2a2016-03-27 01:53:46 +00004438 tests = [
4439 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004440 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes2756ef32018-09-23 09:22:52 +02004441 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004442 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004443
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004444 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004445 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004446
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004447 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004448 try:
4449 support.run_unittest(*tests)
4450 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004451 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004452
4453if __name__ == "__main__":
4454 test_main()