blob: 49a4d72954229504f63131a5baaa74c452d0d8ee [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070064 'notAfter': 'Aug 26 14:23:15 2028 GMT',
65 'notBefore': 'Aug 29 14:23:15 2018 GMT',
66 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010067 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070089 'notAfter': 'Jul 7 14:23:16 2028 GMT',
90 'notBefore': 'Aug 29 14:23:16 2018 GMT',
91 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010092 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000119
Miss Islington (bot)e3228a32018-08-14 10:52:27 -0400120DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100121BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000122
Christian Heimes358cfd42016-09-10 22:43:48 +0200123# Not defined in all versions of OpenSSL
124OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
125OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
126OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
127OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800128OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Thomas Woutersed03b412007-08-28 21:37:11 +0000131def handle_error(prefix):
132 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000133 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000134 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Antoine Pitroub5218772010-05-21 09:56:06 +0000136def can_clear_options():
137 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200138 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000139
140def no_sslv2_implies_sslv3_hello():
141 # 0.9.7h or higher
142 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
143
Christian Heimes2427b502013-11-23 11:24:32 +0100144def have_verify_flags():
145 # 0.9.8 or higher
146 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
147
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800148def _have_secp_curves():
149 if not ssl.HAS_ECDH:
150 return False
151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
152 try:
153 ctx.set_ecdh_curve("secp384r1")
154 except ValueError:
155 return False
156 else:
157 return True
158
159
160HAVE_SECP_CURVES = _have_secp_curves()
161
162
Antoine Pitrouc695c952014-04-28 20:57:36 +0200163def utc_offset(): #NOTE: ignore issues like #1647654
164 # local time = utc time + utc offset
165 if time.daylight and time.localtime().tm_isdst > 0:
166 return -time.altzone # seconds
167 return -time.timezone
168
Christian Heimes9424bb42013-06-17 15:32:57 +0200169def asn1time(cert_time):
170 # Some versions of OpenSSL ignore seconds, see #18207
171 # 0.9.8.i
172 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
173 fmt = "%b %d %H:%M:%S %Y GMT"
174 dt = datetime.datetime.strptime(cert_time, fmt)
175 dt = dt.replace(second=0)
176 cert_time = dt.strftime(fmt)
177 # %d adds leading zero but ASN1_TIME_print() uses leading space
178 if cert_time[4] == "0":
179 cert_time = cert_time[:4] + " " + cert_time[5:]
180
181 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000182
Antoine Pitrou23df4832010-08-04 17:14:06 +0000183# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
184def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200185 if hasattr(ssl, 'PROTOCOL_SSLv2'):
186 @functools.wraps(func)
187 def f(*args, **kwargs):
188 try:
189 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
190 except ssl.SSLError:
191 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
192 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
193 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
194 return func(*args, **kwargs)
195 return f
196 else:
197 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000198
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100199needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
200
Antoine Pitrou23df4832010-08-04 17:14:06 +0000201
Christian Heimesd0486372016-09-10 23:23:33 +0200202def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
203 cert_reqs=ssl.CERT_NONE, ca_certs=None,
204 ciphers=None, certfile=None, keyfile=None,
205 **kwargs):
206 context = ssl.SSLContext(ssl_version)
207 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200208 if cert_reqs == ssl.CERT_NONE:
209 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200210 context.verify_mode = cert_reqs
211 if ca_certs is not None:
212 context.load_verify_locations(ca_certs)
213 if certfile is not None or keyfile is not None:
214 context.load_cert_chain(certfile, keyfile)
215 if ciphers is not None:
216 context.set_ciphers(ciphers)
217 return context.wrap_socket(sock, **kwargs)
218
Christian Heimesa170fa12017-09-15 20:27:30 +0200219
220def testing_context(server_cert=SIGNED_CERTFILE):
221 """Create context
222
223 client_context, server_context, hostname = testing_context()
224 """
225 if server_cert == SIGNED_CERTFILE:
226 hostname = SIGNED_CERTFILE_HOSTNAME
227 elif server_cert == SIGNED_CERTFILE2:
228 hostname = SIGNED_CERTFILE2_HOSTNAME
229 else:
230 raise ValueError(server_cert)
231
232 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
233 client_context.load_verify_locations(SIGNING_CA)
234
235 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
236 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800237 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200238
239 return client_context, server_context, hostname
240
241
Antoine Pitrou152efa22010-05-16 18:19:27 +0000242class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000243
Antoine Pitrou480a1242010-04-28 21:37:09 +0000244 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000245 ssl.CERT_NONE
246 ssl.CERT_OPTIONAL
247 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100248 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100249 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100250 if ssl.HAS_ECDH:
251 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100252 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
253 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000254 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100255 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700256 ssl.OP_NO_SSLv2
257 ssl.OP_NO_SSLv3
258 ssl.OP_NO_TLSv1
259 ssl.OP_NO_TLSv1_3
260 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
261 ssl.OP_NO_TLSv1_1
262 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200263 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000264
Christian Heimes89c20512018-02-27 11:17:32 +0100265 def test_private_init(self):
266 with self.assertRaisesRegex(TypeError, "public constructor"):
267 with socket.socket() as s:
268 ssl.SSLSocket(s)
269
Antoine Pitrou172f0252014-04-18 20:33:08 +0200270 def test_str_for_enums(self):
271 # Make sure that the PROTOCOL_* constants have enum-like string
272 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200273 proto = ssl.PROTOCOL_TLS
274 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200275 ctx = ssl.SSLContext(proto)
276 self.assertIs(ctx.protocol, proto)
277
Antoine Pitrou480a1242010-04-28 21:37:09 +0000278 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000280 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 sys.stdout.write("\n RAND_status is %d (%s)\n"
282 % (v, (v and "sufficient randomness") or
283 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200284
285 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
286 self.assertEqual(len(data), 16)
287 self.assertEqual(is_cryptographic, v == 1)
288 if v:
289 data = ssl.RAND_bytes(16)
290 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200291 else:
292 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200293
Victor Stinner1e81a392013-12-19 16:47:04 +0100294 # negative num is invalid
295 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
296 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
297
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100298 if hasattr(ssl, 'RAND_egd'):
299 self.assertRaises(TypeError, ssl.RAND_egd, 1)
300 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000301 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200302 ssl.RAND_add(b"this is a random bytes object", 75.0)
303 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000304
Christian Heimesf77b4b22013-08-21 13:26:05 +0200305 @unittest.skipUnless(os.name == 'posix', 'requires posix')
306 def test_random_fork(self):
307 status = ssl.RAND_status()
308 if not status:
309 self.fail("OpenSSL's PRNG has insufficient randomness")
310
311 rfd, wfd = os.pipe()
312 pid = os.fork()
313 if pid == 0:
314 try:
315 os.close(rfd)
316 child_random = ssl.RAND_pseudo_bytes(16)[0]
317 self.assertEqual(len(child_random), 16)
318 os.write(wfd, child_random)
319 os.close(wfd)
320 except BaseException:
321 os._exit(1)
322 else:
323 os._exit(0)
324 else:
325 os.close(wfd)
326 self.addCleanup(os.close, rfd)
327 _, status = os.waitpid(pid, 0)
328 self.assertEqual(status, 0)
329
330 child_random = os.read(rfd, 16)
331 self.assertEqual(len(child_random), 16)
332 parent_random = ssl.RAND_pseudo_bytes(16)[0]
333 self.assertEqual(len(parent_random), 16)
334
335 self.assertNotEqual(child_random, parent_random)
336
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -0700337 maxDiff = None
338
Antoine Pitrou480a1242010-04-28 21:37:09 +0000339 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000340 # note that this uses an 'unofficial' function in _ssl.c,
341 # provided solely for this test, to exercise the certificate
342 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100343 self.assertEqual(
344 ssl._ssl._test_decode_cert(CERTFILE),
345 CERTFILE_INFO
346 )
347 self.assertEqual(
348 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
349 SIGNED_CERTFILE_INFO
350 )
351
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200352 # Issue #13034: the subjectAltName in some certificates
353 # (notably projects.developer.nokia.com:443) wasn't parsed
354 p = ssl._ssl._test_decode_cert(NOKIACERT)
355 if support.verbose:
356 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
357 self.assertEqual(p['subjectAltName'],
358 (('DNS', 'projects.developer.nokia.com'),
359 ('DNS', 'projects.forum.nokia.com'))
360 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100361 # extra OCSP and AIA fields
362 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
363 self.assertEqual(p['caIssuers'],
364 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
365 self.assertEqual(p['crlDistributionPoints'],
366 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000367
Christian Heimes824f7f32013-08-17 00:54:47 +0200368 def test_parse_cert_CVE_2013_4238(self):
369 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
370 if support.verbose:
371 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
372 subject = ((('countryName', 'US'),),
373 (('stateOrProvinceName', 'Oregon'),),
374 (('localityName', 'Beaverton'),),
375 (('organizationName', 'Python Software Foundation'),),
376 (('organizationalUnitName', 'Python Core Development'),),
377 (('commonName', 'null.python.org\x00example.org'),),
378 (('emailAddress', 'python-dev@python.org'),))
379 self.assertEqual(p['subject'], subject)
380 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200381 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
382 san = (('DNS', 'altnull.python.org\x00example.com'),
383 ('email', 'null@python.org\x00user@example.org'),
384 ('URI', 'http://null.python.org\x00http://example.org'),
385 ('IP Address', '192.0.2.1'),
386 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
387 else:
388 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
389 san = (('DNS', 'altnull.python.org\x00example.com'),
390 ('email', 'null@python.org\x00user@example.org'),
391 ('URI', 'http://null.python.org\x00http://example.org'),
392 ('IP Address', '192.0.2.1'),
393 ('IP Address', '<invalid>'))
394
395 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200396
Christian Heimes1c03abd2016-09-06 23:25:35 +0200397 def test_parse_all_sans(self):
398 p = ssl._ssl._test_decode_cert(ALLSANFILE)
399 self.assertEqual(p['subjectAltName'],
400 (
401 ('DNS', 'allsans'),
402 ('othername', '<unsupported>'),
403 ('othername', '<unsupported>'),
404 ('email', 'user@example.org'),
405 ('DNS', 'www.example.org'),
406 ('DirName',
407 ((('countryName', 'XY'),),
408 (('localityName', 'Castle Anthrax'),),
409 (('organizationName', 'Python Software Foundation'),),
410 (('commonName', 'dirname example'),))),
411 ('URI', 'https://www.python.org/'),
412 ('IP Address', '127.0.0.1'),
413 ('IP Address', '0:0:0:0:0:0:0:1\n'),
414 ('Registered ID', '1.2.3.4.5')
415 )
416 )
417
Antoine Pitrou480a1242010-04-28 21:37:09 +0000418 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000419 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000420 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000421 d1 = ssl.PEM_cert_to_DER_cert(pem)
422 p2 = ssl.DER_cert_to_PEM_cert(d1)
423 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000424 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000425 if not p2.startswith(ssl.PEM_HEADER + '\n'):
426 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
427 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
428 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000429
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000430 def test_openssl_version(self):
431 n = ssl.OPENSSL_VERSION_NUMBER
432 t = ssl.OPENSSL_VERSION_INFO
433 s = ssl.OPENSSL_VERSION
434 self.assertIsInstance(n, int)
435 self.assertIsInstance(t, tuple)
436 self.assertIsInstance(s, str)
437 # Some sanity checks follow
438 # >= 0.9
439 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400440 # < 3.0
441 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000442 major, minor, fix, patch, status = t
443 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400444 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000445 self.assertGreaterEqual(minor, 0)
446 self.assertLess(minor, 256)
447 self.assertGreaterEqual(fix, 0)
448 self.assertLess(fix, 256)
449 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100450 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000451 self.assertGreaterEqual(status, 0)
452 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400453 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200454 if IS_LIBRESSL:
455 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100456 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400457 else:
458 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100459 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000460
Antoine Pitrou9d543662010-04-23 23:10:32 +0000461 @support.cpython_only
462 def test_refcycle(self):
463 # Issue #7943: an SSL object doesn't create reference cycles with
464 # itself.
465 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200466 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000467 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100468 with support.check_warnings(("", ResourceWarning)):
469 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100470 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000471
Antoine Pitroua468adc2010-09-14 14:43:44 +0000472 def test_wrapped_unconnected(self):
473 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200474 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000475 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200476 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100477 self.assertRaises(OSError, ss.recv, 1)
478 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
479 self.assertRaises(OSError, ss.recvfrom, 1)
480 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
481 self.assertRaises(OSError, ss.send, b'x')
482 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800483 self.assertRaises(NotImplementedError, ss.sendmsg,
484 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000485
Antoine Pitrou40f08742010-04-24 22:04:40 +0000486 def test_timeout(self):
487 # Issue #8524: when creating an SSL socket, the timeout of the
488 # original socket should be retained.
489 for timeout in (None, 0.0, 5.0):
490 s = socket.socket(socket.AF_INET)
491 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200492 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100493 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000494
Christian Heimesd0486372016-09-10 23:23:33 +0200495 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000496 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000497 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000498 "certfile must be specified",
499 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000500 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000501 "certfile must be specified for server-side operations",
502 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000503 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000504 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200505 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100506 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
507 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200508 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200509 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000510 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000511 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000512 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200513 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000514 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000515 ssl.wrap_socket(sock,
516 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000517 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200518 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000519 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000520 ssl.wrap_socket(sock,
521 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000522 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000523
Martin Panter3464ea22016-02-01 21:58:11 +0000524 def bad_cert_test(self, certfile):
525 """Check that trying to use the given client certificate fails"""
526 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
527 certfile)
528 sock = socket.socket()
529 self.addCleanup(sock.close)
530 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200531 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200532 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000533
534 def test_empty_cert(self):
535 """Wrapping with an empty cert file"""
536 self.bad_cert_test("nullcert.pem")
537
538 def test_malformed_cert(self):
539 """Wrapping with a badly formatted certificate (syntax error)"""
540 self.bad_cert_test("badcert.pem")
541
542 def test_malformed_key(self):
543 """Wrapping with a badly formatted key (syntax error)"""
544 self.bad_cert_test("badkey.pem")
545
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000546 def test_match_hostname(self):
547 def ok(cert, hostname):
548 ssl.match_hostname(cert, hostname)
549 def fail(cert, hostname):
550 self.assertRaises(ssl.CertificateError,
551 ssl.match_hostname, cert, hostname)
552
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100553 # -- Hostname matching --
554
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 cert = {'subject': ((('commonName', 'example.com'),),)}
556 ok(cert, 'example.com')
557 ok(cert, 'ExAmple.cOm')
558 fail(cert, 'www.example.com')
559 fail(cert, '.example.com')
560 fail(cert, 'example.org')
561 fail(cert, 'exampleXcom')
562
563 cert = {'subject': ((('commonName', '*.a.com'),),)}
564 ok(cert, 'foo.a.com')
565 fail(cert, 'bar.foo.a.com')
566 fail(cert, 'a.com')
567 fail(cert, 'Xa.com')
568 fail(cert, '.a.com')
569
Mandeep Singhede2ac92017-11-27 04:01:27 +0530570 # only match wildcards when they are the only thing
571 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000572 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530573 fail(cert, 'foo.com')
574 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000575 fail(cert, 'bar.com')
576 fail(cert, 'foo.a.com')
577 fail(cert, 'bar.foo.com')
578
Christian Heimes824f7f32013-08-17 00:54:47 +0200579 # NULL bytes are bad, CVE-2013-4073
580 cert = {'subject': ((('commonName',
581 'null.python.org\x00example.org'),),)}
582 ok(cert, 'null.python.org\x00example.org') # or raise an error?
583 fail(cert, 'example.org')
584 fail(cert, 'null.python.org')
585
Georg Brandl72c98d32013-10-27 07:16:53 +0100586 # error cases with wildcards
587 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
588 fail(cert, 'bar.foo.a.com')
589 fail(cert, 'a.com')
590 fail(cert, 'Xa.com')
591 fail(cert, '.a.com')
592
593 cert = {'subject': ((('commonName', 'a.*.com'),),)}
594 fail(cert, 'a.foo.com')
595 fail(cert, 'a..com')
596 fail(cert, 'a.com')
597
598 # wildcard doesn't match IDNA prefix 'xn--'
599 idna = 'püthon.python.org'.encode("idna").decode("ascii")
600 cert = {'subject': ((('commonName', idna),),)}
601 ok(cert, idna)
602 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
603 fail(cert, idna)
604 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
605 fail(cert, idna)
606
607 # wildcard in first fragment and IDNA A-labels in sequent fragments
608 # are supported.
609 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
610 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530611 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
612 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100613 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
614 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
615
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000616 # Slightly fake real-world example
617 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
618 'subject': ((('commonName', 'linuxfrz.org'),),),
619 'subjectAltName': (('DNS', 'linuxfr.org'),
620 ('DNS', 'linuxfr.com'),
621 ('othername', '<unsupported>'))}
622 ok(cert, 'linuxfr.org')
623 ok(cert, 'linuxfr.com')
624 # Not a "DNS" entry
625 fail(cert, '<unsupported>')
626 # When there is a subjectAltName, commonName isn't used
627 fail(cert, 'linuxfrz.org')
628
629 # A pristine real-world example
630 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
631 'subject': ((('countryName', 'US'),),
632 (('stateOrProvinceName', 'California'),),
633 (('localityName', 'Mountain View'),),
634 (('organizationName', 'Google Inc'),),
635 (('commonName', 'mail.google.com'),))}
636 ok(cert, 'mail.google.com')
637 fail(cert, 'gmail.com')
638 # Only commonName is considered
639 fail(cert, 'California')
640
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100641 # -- IPv4 matching --
642 cert = {'subject': ((('commonName', 'example.com'),),),
643 'subjectAltName': (('DNS', 'example.com'),
644 ('IP Address', '10.11.12.13'),
645 ('IP Address', '14.15.16.17'))}
646 ok(cert, '10.11.12.13')
647 ok(cert, '14.15.16.17')
648 fail(cert, '14.15.16.18')
649 fail(cert, 'example.net')
650
651 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800652 if hasattr(socket, 'AF_INET6'):
653 cert = {'subject': ((('commonName', 'example.com'),),),
654 'subjectAltName': (
655 ('DNS', 'example.com'),
656 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
657 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
658 ok(cert, '2001::cafe')
659 ok(cert, '2003::baba')
660 fail(cert, '2003::bebe')
661 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100662
663 # -- Miscellaneous --
664
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000665 # Neither commonName nor subjectAltName
666 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
667 'subject': ((('countryName', 'US'),),
668 (('stateOrProvinceName', 'California'),),
669 (('localityName', 'Mountain View'),),
670 (('organizationName', 'Google Inc'),))}
671 fail(cert, 'mail.google.com')
672
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200673 # No DNS entry in subjectAltName but a commonName
674 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
675 'subject': ((('countryName', 'US'),),
676 (('stateOrProvinceName', 'California'),),
677 (('localityName', 'Mountain View'),),
678 (('commonName', 'mail.google.com'),)),
679 'subjectAltName': (('othername', 'blabla'), )}
680 ok(cert, 'mail.google.com')
681
682 # No DNS entry subjectAltName and no commonName
683 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
684 'subject': ((('countryName', 'US'),),
685 (('stateOrProvinceName', 'California'),),
686 (('localityName', 'Mountain View'),),
687 (('organizationName', 'Google Inc'),)),
688 'subjectAltName': (('othername', 'blabla'),)}
689 fail(cert, 'google.com')
690
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000691 # Empty cert / no cert
692 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
693 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
694
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200695 # Issue #17980: avoid denials of service by refusing more than one
696 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800697 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
698 with self.assertRaisesRegex(
699 ssl.CertificateError,
700 "partial wildcards in leftmost label are not supported"):
701 ssl.match_hostname(cert, 'axxb.example.com')
702
703 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
704 with self.assertRaisesRegex(
705 ssl.CertificateError,
706 "wildcard can only be present in the leftmost label"):
707 ssl.match_hostname(cert, 'www.sub.example.com')
708
709 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
710 with self.assertRaisesRegex(
711 ssl.CertificateError,
712 "too many wildcards"):
713 ssl.match_hostname(cert, 'axxbxxc.example.com')
714
715 cert = {'subject': ((('commonName', '*'),),)}
716 with self.assertRaisesRegex(
717 ssl.CertificateError,
718 "sole wildcard without additional labels are not support"):
719 ssl.match_hostname(cert, 'host')
720
721 cert = {'subject': ((('commonName', '*.com'),),)}
722 with self.assertRaisesRegex(
723 ssl.CertificateError,
724 r"hostname 'com' doesn't match '\*.com'"):
725 ssl.match_hostname(cert, 'com')
726
727 # extra checks for _inet_paton()
728 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
729 with self.assertRaises(ValueError):
730 ssl._inet_paton(invalid)
731 for ipaddr in ['127.0.0.1', '192.168.0.1']:
732 self.assertTrue(ssl._inet_paton(ipaddr))
733 if hasattr(socket, 'AF_INET6'):
734 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
735 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200736
Antoine Pitroud5323212010-10-22 18:19:07 +0000737 def test_server_side(self):
738 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200739 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000740 with socket.socket() as sock:
741 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
742 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000743
Antoine Pitroud6494802011-07-21 01:11:30 +0200744 def test_unknown_channel_binding(self):
745 # should raise ValueError for unknown type
746 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200747 s.bind(('127.0.0.1', 0))
748 s.listen()
749 c = socket.socket(socket.AF_INET)
750 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200751 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100752 with self.assertRaises(ValueError):
753 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200754 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200755
756 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
757 "'tls-unique' channel binding not available")
758 def test_tls_unique_channel_binding(self):
759 # unconnected should return None for known type
760 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200761 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100762 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200763 # the same for server-side
764 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200765 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100766 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200767
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600768 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200769 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600770 r = repr(ss)
771 with self.assertWarns(ResourceWarning) as cm:
772 ss = None
773 support.gc_collect()
774 self.assertIn(r, str(cm.warning.args[0]))
775
Christian Heimes6d7ad132013-06-09 18:02:55 +0200776 def test_get_default_verify_paths(self):
777 paths = ssl.get_default_verify_paths()
778 self.assertEqual(len(paths), 6)
779 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
780
781 with support.EnvironmentVarGuard() as env:
782 env["SSL_CERT_DIR"] = CAPATH
783 env["SSL_CERT_FILE"] = CERTFILE
784 paths = ssl.get_default_verify_paths()
785 self.assertEqual(paths.cafile, CERTFILE)
786 self.assertEqual(paths.capath, CAPATH)
787
Christian Heimes44109d72013-11-22 01:51:30 +0100788 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
789 def test_enum_certificates(self):
790 self.assertTrue(ssl.enum_certificates("CA"))
791 self.assertTrue(ssl.enum_certificates("ROOT"))
792
793 self.assertRaises(TypeError, ssl.enum_certificates)
794 self.assertRaises(WindowsError, ssl.enum_certificates, "")
795
Christian Heimesc2d65e12013-11-22 16:13:55 +0100796 trust_oids = set()
797 for storename in ("CA", "ROOT"):
798 store = ssl.enum_certificates(storename)
799 self.assertIsInstance(store, list)
800 for element in store:
801 self.assertIsInstance(element, tuple)
802 self.assertEqual(len(element), 3)
803 cert, enc, trust = element
804 self.assertIsInstance(cert, bytes)
805 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
806 self.assertIsInstance(trust, (set, bool))
807 if isinstance(trust, set):
808 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100809
810 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100811 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200812
Christian Heimes46bebee2013-06-09 19:03:31 +0200813 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100814 def test_enum_crls(self):
815 self.assertTrue(ssl.enum_crls("CA"))
816 self.assertRaises(TypeError, ssl.enum_crls)
817 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200818
Christian Heimes44109d72013-11-22 01:51:30 +0100819 crls = ssl.enum_crls("CA")
820 self.assertIsInstance(crls, list)
821 for element in crls:
822 self.assertIsInstance(element, tuple)
823 self.assertEqual(len(element), 2)
824 self.assertIsInstance(element[0], bytes)
825 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200826
Christian Heimes46bebee2013-06-09 19:03:31 +0200827
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100828 def test_asn1object(self):
829 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
830 '1.3.6.1.5.5.7.3.1')
831
832 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
833 self.assertEqual(val, expected)
834 self.assertEqual(val.nid, 129)
835 self.assertEqual(val.shortname, 'serverAuth')
836 self.assertEqual(val.longname, 'TLS Web Server Authentication')
837 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
838 self.assertIsInstance(val, ssl._ASN1Object)
839 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
840
841 val = ssl._ASN1Object.fromnid(129)
842 self.assertEqual(val, expected)
843 self.assertIsInstance(val, ssl._ASN1Object)
844 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100845 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
846 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100847 for i in range(1000):
848 try:
849 obj = ssl._ASN1Object.fromnid(i)
850 except ValueError:
851 pass
852 else:
853 self.assertIsInstance(obj.nid, int)
854 self.assertIsInstance(obj.shortname, str)
855 self.assertIsInstance(obj.longname, str)
856 self.assertIsInstance(obj.oid, (str, type(None)))
857
858 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
859 self.assertEqual(val, expected)
860 self.assertIsInstance(val, ssl._ASN1Object)
861 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
862 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
863 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100864 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
865 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100866
Christian Heimes72d28502013-11-23 13:56:58 +0100867 def test_purpose_enum(self):
868 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
869 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
872 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
873 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
874 '1.3.6.1.5.5.7.3.1')
875
876 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
877 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
880 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
881 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
882 '1.3.6.1.5.5.7.3.2')
883
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100884 def test_unsupported_dtls(self):
885 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
886 self.addCleanup(s.close)
887 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200888 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100889 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200890 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100891 with self.assertRaises(NotImplementedError) as cx:
892 ctx.wrap_socket(s)
893 self.assertEqual(str(cx.exception), "only stream sockets are supported")
894
Antoine Pitrouc695c952014-04-28 20:57:36 +0200895 def cert_time_ok(self, timestring, timestamp):
896 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
897
898 def cert_time_fail(self, timestring):
899 with self.assertRaises(ValueError):
900 ssl.cert_time_to_seconds(timestring)
901
902 @unittest.skipUnless(utc_offset(),
903 'local time needs to be different from UTC')
904 def test_cert_time_to_seconds_timezone(self):
905 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
906 # results if local timezone is not UTC
907 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
908 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
909
910 def test_cert_time_to_seconds(self):
911 timestring = "Jan 5 09:34:43 2018 GMT"
912 ts = 1515144883.0
913 self.cert_time_ok(timestring, ts)
914 # accept keyword parameter, assert its name
915 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
916 # accept both %e and %d (space or zero generated by strftime)
917 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
918 # case-insensitive
919 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
920 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
921 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
922 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
923 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
924 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
925 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
926 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
927
928 newyear_ts = 1230768000.0
929 # leap seconds
930 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
931 # same timestamp
932 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
933
934 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
935 # allow 60th second (even if it is not a leap second)
936 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
937 # allow 2nd leap second for compatibility with time.strptime()
938 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
939 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
940
Mike53f7a7c2017-12-14 14:04:53 +0300941 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200942 # 99991231235959Z (rfc 5280)
943 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
944
945 @support.run_with_locale('LC_ALL', '')
946 def test_cert_time_to_seconds_locale(self):
947 # `cert_time_to_seconds()` should be locale independent
948
949 def local_february_name():
950 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
951
952 if local_february_name().lower() == 'feb':
953 self.skipTest("locale-specific month name needs to be "
954 "different from C locale")
955
956 # locale-independent
957 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
958 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
959
Martin Panter3840b2a2016-03-27 01:53:46 +0000960 def test_connect_ex_error(self):
961 server = socket.socket(socket.AF_INET)
962 self.addCleanup(server.close)
963 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200964 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000965 cert_reqs=ssl.CERT_REQUIRED)
966 self.addCleanup(s.close)
967 rc = s.connect_ex((HOST, port))
968 # Issue #19919: Windows machines or VMs hosted on Windows
969 # machines sometimes return EWOULDBLOCK.
970 errors = (
971 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
972 errno.EWOULDBLOCK,
973 )
974 self.assertIn(rc, errors)
975
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100976
Antoine Pitrou152efa22010-05-16 18:19:27 +0000977class ContextTests(unittest.TestCase):
978
Antoine Pitrou23df4832010-08-04 17:14:06 +0000979 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000980 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100981 for protocol in PROTOCOLS:
982 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200983 ctx = ssl.SSLContext()
984 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000985 self.assertRaises(ValueError, ssl.SSLContext, -1)
986 self.assertRaises(ValueError, ssl.SSLContext, 42)
987
Antoine Pitrou23df4832010-08-04 17:14:06 +0000988 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000989 def test_protocol(self):
990 for proto in PROTOCOLS:
991 ctx = ssl.SSLContext(proto)
992 self.assertEqual(ctx.protocol, proto)
993
994 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200995 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000996 ctx.set_ciphers("ALL")
997 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000998 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000999 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001000
Christian Heimes892d66e2018-01-29 14:10:18 +01001001 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1002 "Test applies only to Python default ciphers")
1003 def test_python_ciphers(self):
1004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1005 ciphers = ctx.get_ciphers()
1006 for suite in ciphers:
1007 name = suite['name']
1008 self.assertNotIn("PSK", name)
1009 self.assertNotIn("SRP", name)
1010 self.assertNotIn("MD5", name)
1011 self.assertNotIn("RC4", name)
1012 self.assertNotIn("3DES", name)
1013
Christian Heimes25bfcd52016-09-06 00:04:45 +02001014 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1015 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001016 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001017 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001018 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001019 self.assertIn('AES256-GCM-SHA384', names)
1020 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001021
Antoine Pitrou23df4832010-08-04 17:14:06 +00001022 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001023 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001025 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001026 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001027 # SSLContext also enables these by default
1028 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001029 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1030 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001031 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001032 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001033 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001034 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001035 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1036 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001037 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001038 # Ubuntu has OP_NO_SSLv3 forced on by default
1039 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001040 else:
1041 with self.assertRaises(ValueError):
1042 ctx.options = 0
1043
Christian Heimesa170fa12017-09-15 20:27:30 +02001044 def test_verify_mode_protocol(self):
1045 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001046 # Default value
1047 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1048 ctx.verify_mode = ssl.CERT_OPTIONAL
1049 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1050 ctx.verify_mode = ssl.CERT_REQUIRED
1051 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1052 ctx.verify_mode = ssl.CERT_NONE
1053 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1054 with self.assertRaises(TypeError):
1055 ctx.verify_mode = None
1056 with self.assertRaises(ValueError):
1057 ctx.verify_mode = 42
1058
Christian Heimesa170fa12017-09-15 20:27:30 +02001059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1060 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1061 self.assertFalse(ctx.check_hostname)
1062
1063 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1064 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1065 self.assertTrue(ctx.check_hostname)
1066
Christian Heimes61d478c2018-01-27 15:51:38 +01001067 def test_hostname_checks_common_name(self):
1068 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1069 self.assertTrue(ctx.hostname_checks_common_name)
1070 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1071 ctx.hostname_checks_common_name = True
1072 self.assertTrue(ctx.hostname_checks_common_name)
1073 ctx.hostname_checks_common_name = False
1074 self.assertFalse(ctx.hostname_checks_common_name)
1075 ctx.hostname_checks_common_name = True
1076 self.assertTrue(ctx.hostname_checks_common_name)
1077 else:
1078 with self.assertRaises(AttributeError):
1079 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001080
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001081 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1082 "required OpenSSL 1.1.0g")
1083 def test_min_max_version(self):
1084 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1085 self.assertEqual(
1086 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1087 )
1088 self.assertEqual(
1089 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1090 )
1091
1092 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1093 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1094 self.assertEqual(
1095 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1096 )
1097 self.assertEqual(
1098 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1099 )
1100
1101 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1102 ctx.maximum_version = ssl.TLSVersion.TLSv1
1103 self.assertEqual(
1104 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1105 )
1106 self.assertEqual(
1107 ctx.maximum_version, ssl.TLSVersion.TLSv1
1108 )
1109
1110 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1111 self.assertEqual(
1112 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1113 )
1114
1115 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1116 self.assertIn(
1117 ctx.maximum_version,
1118 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1119 )
1120
1121 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1122 self.assertIn(
1123 ctx.minimum_version,
1124 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1125 )
1126
1127 with self.assertRaises(ValueError):
1128 ctx.minimum_version = 42
1129
1130 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1131
1132 self.assertEqual(
1133 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1134 )
1135 self.assertEqual(
1136 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 )
1138 with self.assertRaises(ValueError):
1139 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1140 with self.assertRaises(ValueError):
1141 ctx.maximum_version = ssl.TLSVersion.TLSv1
1142
1143
Christian Heimes2427b502013-11-23 11:24:32 +01001144 @unittest.skipUnless(have_verify_flags(),
1145 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001146 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001147 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001148 # default value
1149 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1150 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001151 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1152 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1153 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1154 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1155 ctx.verify_flags = ssl.VERIFY_DEFAULT
1156 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1157 # supports any value
1158 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1159 self.assertEqual(ctx.verify_flags,
1160 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1161 with self.assertRaises(TypeError):
1162 ctx.verify_flags = None
1163
Antoine Pitrou152efa22010-05-16 18:19:27 +00001164 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001165 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001166 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001167 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001168 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1169 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001170 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001171 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001172 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001173 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001174 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001175 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001176 ctx.load_cert_chain(EMPTYCERT)
1177 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001178 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001179 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1180 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1181 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001182 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001183 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001184 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001185 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001186 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001187 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1188 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001189 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001190 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001191 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001192 # Password protected key and cert
1193 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1194 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1195 ctx.load_cert_chain(CERTFILE_PROTECTED,
1196 password=bytearray(KEY_PASSWORD.encode()))
1197 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1198 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1199 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1200 bytearray(KEY_PASSWORD.encode()))
1201 with self.assertRaisesRegex(TypeError, "should be a string"):
1202 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1203 with self.assertRaises(ssl.SSLError):
1204 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1205 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1206 # openssl has a fixed limit on the password buffer.
1207 # PEM_BUFSIZE is generally set to 1kb.
1208 # Return a string larger than this.
1209 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1210 # Password callback
1211 def getpass_unicode():
1212 return KEY_PASSWORD
1213 def getpass_bytes():
1214 return KEY_PASSWORD.encode()
1215 def getpass_bytearray():
1216 return bytearray(KEY_PASSWORD.encode())
1217 def getpass_badpass():
1218 return "badpass"
1219 def getpass_huge():
1220 return b'a' * (1024 * 1024)
1221 def getpass_bad_type():
1222 return 9
1223 def getpass_exception():
1224 raise Exception('getpass error')
1225 class GetPassCallable:
1226 def __call__(self):
1227 return KEY_PASSWORD
1228 def getpass(self):
1229 return KEY_PASSWORD
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1232 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1233 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1234 ctx.load_cert_chain(CERTFILE_PROTECTED,
1235 password=GetPassCallable().getpass)
1236 with self.assertRaises(ssl.SSLError):
1237 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1238 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1239 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1240 with self.assertRaisesRegex(TypeError, "must return a string"):
1241 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1242 with self.assertRaisesRegex(Exception, "getpass error"):
1243 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1244 # Make sure the password function isn't called if it isn't needed
1245 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001246
1247 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001248 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001249 ctx.load_verify_locations(CERTFILE)
1250 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1251 ctx.load_verify_locations(BYTES_CERTFILE)
1252 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1253 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001254 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001255 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001256 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001257 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001258 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001259 ctx.load_verify_locations(BADCERT)
1260 ctx.load_verify_locations(CERTFILE, CAPATH)
1261 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1262
Victor Stinner80f75e62011-01-29 11:31:20 +00001263 # Issue #10989: crash if the second argument type is invalid
1264 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1265
Christian Heimesefff7062013-11-21 03:35:02 +01001266 def test_load_verify_cadata(self):
1267 # test cadata
1268 with open(CAFILE_CACERT) as f:
1269 cacert_pem = f.read()
1270 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1271 with open(CAFILE_NEURONIO) as f:
1272 neuronio_pem = f.read()
1273 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1274
1275 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001276 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001277 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1278 ctx.load_verify_locations(cadata=cacert_pem)
1279 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1280 ctx.load_verify_locations(cadata=neuronio_pem)
1281 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1282 # cert already in hash table
1283 ctx.load_verify_locations(cadata=neuronio_pem)
1284 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1285
1286 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001287 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001288 combined = "\n".join((cacert_pem, neuronio_pem))
1289 ctx.load_verify_locations(cadata=combined)
1290 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1291
1292 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001293 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001294 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1295 neuronio_pem, "tail"]
1296 ctx.load_verify_locations(cadata="\n".join(combined))
1297 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1298
1299 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001300 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001301 ctx.load_verify_locations(cadata=cacert_der)
1302 ctx.load_verify_locations(cadata=neuronio_der)
1303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1304 # cert already in hash table
1305 ctx.load_verify_locations(cadata=cacert_der)
1306 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1307
1308 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001310 combined = b"".join((cacert_der, neuronio_der))
1311 ctx.load_verify_locations(cadata=combined)
1312 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1313
1314 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001315 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001316 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1317
1318 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1319 ctx.load_verify_locations(cadata="broken")
1320 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1321 ctx.load_verify_locations(cadata=b"broken")
1322
1323
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001324 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001325 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001326 ctx.load_dh_params(DHFILE)
1327 if os.name != 'nt':
1328 ctx.load_dh_params(BYTES_DHFILE)
1329 self.assertRaises(TypeError, ctx.load_dh_params)
1330 self.assertRaises(TypeError, ctx.load_dh_params, None)
1331 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001332 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001333 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001334 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001335 ctx.load_dh_params(CERTFILE)
1336
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001337 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001338 def test_session_stats(self):
1339 for proto in PROTOCOLS:
1340 ctx = ssl.SSLContext(proto)
1341 self.assertEqual(ctx.session_stats(), {
1342 'number': 0,
1343 'connect': 0,
1344 'connect_good': 0,
1345 'connect_renegotiate': 0,
1346 'accept': 0,
1347 'accept_good': 0,
1348 'accept_renegotiate': 0,
1349 'hits': 0,
1350 'misses': 0,
1351 'timeouts': 0,
1352 'cache_full': 0,
1353 })
1354
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001355 def test_set_default_verify_paths(self):
1356 # There's not much we can do to test that it acts as expected,
1357 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001358 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001359 ctx.set_default_verify_paths()
1360
Antoine Pitrou501da612011-12-21 09:27:41 +01001361 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001362 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001363 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001364 ctx.set_ecdh_curve("prime256v1")
1365 ctx.set_ecdh_curve(b"prime256v1")
1366 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1367 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1368 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1369 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1370
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001371 @needs_sni
1372 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001373 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001374
1375 # set_servername_callback expects a callable, or None
1376 self.assertRaises(TypeError, ctx.set_servername_callback)
1377 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1378 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1379 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1380
1381 def dummycallback(sock, servername, ctx):
1382 pass
1383 ctx.set_servername_callback(None)
1384 ctx.set_servername_callback(dummycallback)
1385
1386 @needs_sni
1387 def test_sni_callback_refcycle(self):
1388 # Reference cycles through the servername callback are detected
1389 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001390 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001391 def dummycallback(sock, servername, ctx, cycle=ctx):
1392 pass
1393 ctx.set_servername_callback(dummycallback)
1394 wr = weakref.ref(ctx)
1395 del ctx, dummycallback
1396 gc.collect()
1397 self.assertIs(wr(), None)
1398
Christian Heimes9a5395a2013-06-17 15:44:12 +02001399 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001400 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001401 self.assertEqual(ctx.cert_store_stats(),
1402 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1403 ctx.load_cert_chain(CERTFILE)
1404 self.assertEqual(ctx.cert_store_stats(),
1405 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1406 ctx.load_verify_locations(CERTFILE)
1407 self.assertEqual(ctx.cert_store_stats(),
1408 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001409 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001410 self.assertEqual(ctx.cert_store_stats(),
1411 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1412
1413 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001414 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001415 self.assertEqual(ctx.get_ca_certs(), [])
1416 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1417 ctx.load_verify_locations(CERTFILE)
1418 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001419 # but CAFILE_CACERT is a CA cert
1420 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001421 self.assertEqual(ctx.get_ca_certs(),
1422 [{'issuer': ((('organizationName', 'Root CA'),),
1423 (('organizationalUnitName', 'http://www.cacert.org'),),
1424 (('commonName', 'CA Cert Signing Authority'),),
1425 (('emailAddress', 'support@cacert.org'),)),
1426 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1427 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1428 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001429 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001430 'subject': ((('organizationName', 'Root CA'),),
1431 (('organizationalUnitName', 'http://www.cacert.org'),),
1432 (('commonName', 'CA Cert Signing Authority'),),
1433 (('emailAddress', 'support@cacert.org'),)),
1434 'version': 3}])
1435
Martin Panterb55f8b72016-01-14 12:53:56 +00001436 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001437 pem = f.read()
1438 der = ssl.PEM_cert_to_DER_cert(pem)
1439 self.assertEqual(ctx.get_ca_certs(True), [der])
1440
Christian Heimes72d28502013-11-23 13:56:58 +01001441 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001442 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001443 ctx.load_default_certs()
1444
Christian Heimesa170fa12017-09-15 20:27:30 +02001445 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001446 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1447 ctx.load_default_certs()
1448
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001450 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1451
Christian Heimesa170fa12017-09-15 20:27:30 +02001452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001453 self.assertRaises(TypeError, ctx.load_default_certs, None)
1454 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1455
Benjamin Peterson91244e02014-10-03 18:17:15 -04001456 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001457 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001458 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001459 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001460 with support.EnvironmentVarGuard() as env:
1461 env["SSL_CERT_DIR"] = CAPATH
1462 env["SSL_CERT_FILE"] = CERTFILE
1463 ctx.load_default_certs()
1464 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1465
Benjamin Peterson91244e02014-10-03 18:17:15 -04001466 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001467 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001468 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001469 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001470 ctx.load_default_certs()
1471 stats = ctx.cert_store_stats()
1472
Christian Heimesa170fa12017-09-15 20:27:30 +02001473 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001474 with support.EnvironmentVarGuard() as env:
1475 env["SSL_CERT_DIR"] = CAPATH
1476 env["SSL_CERT_FILE"] = CERTFILE
1477 ctx.load_default_certs()
1478 stats["x509"] += 1
1479 self.assertEqual(ctx.cert_store_stats(), stats)
1480
Christian Heimes358cfd42016-09-10 22:43:48 +02001481 def _assert_context_options(self, ctx):
1482 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1483 if OP_NO_COMPRESSION != 0:
1484 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1485 OP_NO_COMPRESSION)
1486 if OP_SINGLE_DH_USE != 0:
1487 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1488 OP_SINGLE_DH_USE)
1489 if OP_SINGLE_ECDH_USE != 0:
1490 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1491 OP_SINGLE_ECDH_USE)
1492 if OP_CIPHER_SERVER_PREFERENCE != 0:
1493 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1494 OP_CIPHER_SERVER_PREFERENCE)
1495
Christian Heimes4c05b472013-11-23 15:58:30 +01001496 def test_create_default_context(self):
1497 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001498
Christian Heimesa170fa12017-09-15 20:27:30 +02001499 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001500 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001501 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001502 self._assert_context_options(ctx)
1503
Christian Heimes4c05b472013-11-23 15:58:30 +01001504 with open(SIGNING_CA) as f:
1505 cadata = f.read()
1506 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1507 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001508 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001509 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001510 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001511
1512 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001513 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001514 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001515 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001516
Christian Heimes67986f92013-11-23 22:43:47 +01001517 def test__create_stdlib_context(self):
1518 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001519 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001520 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001521 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001522 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001523
1524 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1525 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1526 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001527 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001528
1529 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001530 cert_reqs=ssl.CERT_REQUIRED,
1531 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001532 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1533 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001534 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001535 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001536
1537 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001539 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001540 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001541
Christian Heimes1aa9a752013-12-02 02:41:19 +01001542 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001543 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001544 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001545 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001546
Christian Heimese82c0342017-09-15 20:29:57 +02001547 # Auto set CERT_REQUIRED
1548 ctx.check_hostname = True
1549 self.assertTrue(ctx.check_hostname)
1550 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1551 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001552 ctx.verify_mode = ssl.CERT_REQUIRED
1553 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001554 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001555
Christian Heimese82c0342017-09-15 20:29:57 +02001556 # Changing verify_mode does not affect check_hostname
1557 ctx.check_hostname = False
1558 ctx.verify_mode = ssl.CERT_NONE
1559 ctx.check_hostname = False
1560 self.assertFalse(ctx.check_hostname)
1561 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1562 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001563 ctx.check_hostname = True
1564 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001565 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1566
1567 ctx.check_hostname = False
1568 ctx.verify_mode = ssl.CERT_OPTIONAL
1569 ctx.check_hostname = False
1570 self.assertFalse(ctx.check_hostname)
1571 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1572 # keep CERT_OPTIONAL
1573 ctx.check_hostname = True
1574 self.assertTrue(ctx.check_hostname)
1575 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001576
1577 # Cannot set CERT_NONE with check_hostname enabled
1578 with self.assertRaises(ValueError):
1579 ctx.verify_mode = ssl.CERT_NONE
1580 ctx.check_hostname = False
1581 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001582 ctx.verify_mode = ssl.CERT_NONE
1583 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001584
Christian Heimes5fe668c2016-09-12 00:01:11 +02001585 def test_context_client_server(self):
1586 # PROTOCOL_TLS_CLIENT has sane defaults
1587 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1588 self.assertTrue(ctx.check_hostname)
1589 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1590
1591 # PROTOCOL_TLS_SERVER has different but also sane defaults
1592 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1593 self.assertFalse(ctx.check_hostname)
1594 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1595
Christian Heimes4df60f12017-09-15 20:26:05 +02001596 def test_context_custom_class(self):
1597 class MySSLSocket(ssl.SSLSocket):
1598 pass
1599
1600 class MySSLObject(ssl.SSLObject):
1601 pass
1602
1603 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1604 ctx.sslsocket_class = MySSLSocket
1605 ctx.sslobject_class = MySSLObject
1606
1607 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1608 self.assertIsInstance(sock, MySSLSocket)
1609 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1610 self.assertIsInstance(obj, MySSLObject)
1611
Antoine Pitrou152efa22010-05-16 18:19:27 +00001612
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001613class SSLErrorTests(unittest.TestCase):
1614
1615 def test_str(self):
1616 # The str() of a SSLError doesn't include the errno
1617 e = ssl.SSLError(1, "foo")
1618 self.assertEqual(str(e), "foo")
1619 self.assertEqual(e.errno, 1)
1620 # Same for a subclass
1621 e = ssl.SSLZeroReturnError(1, "foo")
1622 self.assertEqual(str(e), "foo")
1623 self.assertEqual(e.errno, 1)
1624
1625 def test_lib_reason(self):
1626 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001627 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001628 with self.assertRaises(ssl.SSLError) as cm:
1629 ctx.load_dh_params(CERTFILE)
1630 self.assertEqual(cm.exception.library, 'PEM')
1631 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1632 s = str(cm.exception)
1633 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1634
1635 def test_subclass(self):
1636 # Check that the appropriate SSLError subclass is raised
1637 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001638 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1639 ctx.check_hostname = False
1640 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001641 with socket.socket() as s:
1642 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001643 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001644 c = socket.socket()
1645 c.connect(s.getsockname())
1646 c.setblocking(False)
1647 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001648 with self.assertRaises(ssl.SSLWantReadError) as cm:
1649 c.do_handshake()
1650 s = str(cm.exception)
1651 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1652 # For compatibility
1653 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1654
1655
Christian Heimes61d478c2018-01-27 15:51:38 +01001656 def test_bad_server_hostname(self):
1657 ctx = ssl.create_default_context()
1658 with self.assertRaises(ValueError):
1659 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1660 server_hostname="")
1661 with self.assertRaises(ValueError):
1662 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1663 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001664 with self.assertRaises(TypeError):
1665 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1666 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001667
1668
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001669class MemoryBIOTests(unittest.TestCase):
1670
1671 def test_read_write(self):
1672 bio = ssl.MemoryBIO()
1673 bio.write(b'foo')
1674 self.assertEqual(bio.read(), b'foo')
1675 self.assertEqual(bio.read(), b'')
1676 bio.write(b'foo')
1677 bio.write(b'bar')
1678 self.assertEqual(bio.read(), b'foobar')
1679 self.assertEqual(bio.read(), b'')
1680 bio.write(b'baz')
1681 self.assertEqual(bio.read(2), b'ba')
1682 self.assertEqual(bio.read(1), b'z')
1683 self.assertEqual(bio.read(1), b'')
1684
1685 def test_eof(self):
1686 bio = ssl.MemoryBIO()
1687 self.assertFalse(bio.eof)
1688 self.assertEqual(bio.read(), b'')
1689 self.assertFalse(bio.eof)
1690 bio.write(b'foo')
1691 self.assertFalse(bio.eof)
1692 bio.write_eof()
1693 self.assertFalse(bio.eof)
1694 self.assertEqual(bio.read(2), b'fo')
1695 self.assertFalse(bio.eof)
1696 self.assertEqual(bio.read(1), b'o')
1697 self.assertTrue(bio.eof)
1698 self.assertEqual(bio.read(), b'')
1699 self.assertTrue(bio.eof)
1700
1701 def test_pending(self):
1702 bio = ssl.MemoryBIO()
1703 self.assertEqual(bio.pending, 0)
1704 bio.write(b'foo')
1705 self.assertEqual(bio.pending, 3)
1706 for i in range(3):
1707 bio.read(1)
1708 self.assertEqual(bio.pending, 3-i-1)
1709 for i in range(3):
1710 bio.write(b'x')
1711 self.assertEqual(bio.pending, i+1)
1712 bio.read()
1713 self.assertEqual(bio.pending, 0)
1714
1715 def test_buffer_types(self):
1716 bio = ssl.MemoryBIO()
1717 bio.write(b'foo')
1718 self.assertEqual(bio.read(), b'foo')
1719 bio.write(bytearray(b'bar'))
1720 self.assertEqual(bio.read(), b'bar')
1721 bio.write(memoryview(b'baz'))
1722 self.assertEqual(bio.read(), b'baz')
1723
1724 def test_error_types(self):
1725 bio = ssl.MemoryBIO()
1726 self.assertRaises(TypeError, bio.write, 'foo')
1727 self.assertRaises(TypeError, bio.write, None)
1728 self.assertRaises(TypeError, bio.write, True)
1729 self.assertRaises(TypeError, bio.write, 1)
1730
1731
Christian Heimes89c20512018-02-27 11:17:32 +01001732class SSLObjectTests(unittest.TestCase):
1733 def test_private_init(self):
1734 bio = ssl.MemoryBIO()
1735 with self.assertRaisesRegex(TypeError, "public constructor"):
1736 ssl.SSLObject(bio, bio)
1737
1738
Martin Panter3840b2a2016-03-27 01:53:46 +00001739class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001740 """Tests that connect to a simple server running in the background"""
1741
1742 def setUp(self):
1743 server = ThreadedEchoServer(SIGNED_CERTFILE)
1744 self.server_addr = (HOST, server.port)
1745 server.__enter__()
1746 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001747
Antoine Pitrou480a1242010-04-28 21:37:09 +00001748 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001749 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001750 cert_reqs=ssl.CERT_NONE) as s:
1751 s.connect(self.server_addr)
1752 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001753 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001754
Martin Panter3840b2a2016-03-27 01:53:46 +00001755 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001756 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001757 cert_reqs=ssl.CERT_REQUIRED,
1758 ca_certs=SIGNING_CA) as s:
1759 s.connect(self.server_addr)
1760 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001761 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001762
Martin Panter3840b2a2016-03-27 01:53:46 +00001763 def test_connect_fail(self):
1764 # This should fail because we have no verification certs. Connection
1765 # failure crashes ThreadedEchoServer, so run this in an independent
1766 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001767 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001768 cert_reqs=ssl.CERT_REQUIRED)
1769 self.addCleanup(s.close)
1770 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1771 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001772
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001773 def test_connect_ex(self):
1774 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001775 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001776 cert_reqs=ssl.CERT_REQUIRED,
1777 ca_certs=SIGNING_CA)
1778 self.addCleanup(s.close)
1779 self.assertEqual(0, s.connect_ex(self.server_addr))
1780 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001781
1782 def test_non_blocking_connect_ex(self):
1783 # Issue #11326: non-blocking connect_ex() should allow handshake
1784 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001785 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001786 cert_reqs=ssl.CERT_REQUIRED,
1787 ca_certs=SIGNING_CA,
1788 do_handshake_on_connect=False)
1789 self.addCleanup(s.close)
1790 s.setblocking(False)
1791 rc = s.connect_ex(self.server_addr)
1792 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1793 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1794 # Wait for connect to finish
1795 select.select([], [s], [], 5.0)
1796 # Non-blocking handshake
1797 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001798 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001799 s.do_handshake()
1800 break
1801 except ssl.SSLWantReadError:
1802 select.select([s], [], [], 5.0)
1803 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001804 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001805 # SSL established
1806 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001807
Antoine Pitrou152efa22010-05-16 18:19:27 +00001808 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001809 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001810 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001811 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1812 s.connect(self.server_addr)
1813 self.assertEqual({}, s.getpeercert())
1814 # Same with a server hostname
1815 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1816 server_hostname="dummy") as s:
1817 s.connect(self.server_addr)
1818 ctx.verify_mode = ssl.CERT_REQUIRED
1819 # This should succeed because we specify the root cert
1820 ctx.load_verify_locations(SIGNING_CA)
1821 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1822 s.connect(self.server_addr)
1823 cert = s.getpeercert()
1824 self.assertTrue(cert)
1825
1826 def test_connect_with_context_fail(self):
1827 # This should fail because we have no verification certs. Connection
1828 # failure crashes ThreadedEchoServer, so run this in an independent
1829 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001830 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001831 ctx.verify_mode = ssl.CERT_REQUIRED
1832 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1833 self.addCleanup(s.close)
1834 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1835 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001836
1837 def test_connect_capath(self):
1838 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001839 # NOTE: the subject hashing algorithm has been changed between
1840 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1841 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001842 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001843 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001844 ctx.verify_mode = ssl.CERT_REQUIRED
1845 ctx.load_verify_locations(capath=CAPATH)
1846 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1847 s.connect(self.server_addr)
1848 cert = s.getpeercert()
1849 self.assertTrue(cert)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07001850
Martin Panter3840b2a2016-03-27 01:53:46 +00001851 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001853 ctx.verify_mode = ssl.CERT_REQUIRED
1854 ctx.load_verify_locations(capath=BYTES_CAPATH)
1855 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1856 s.connect(self.server_addr)
1857 cert = s.getpeercert()
1858 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001859
Christian Heimesefff7062013-11-21 03:35:02 +01001860 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001861 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001862 pem = f.read()
1863 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001864 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001865 ctx.verify_mode = ssl.CERT_REQUIRED
1866 ctx.load_verify_locations(cadata=pem)
1867 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1868 s.connect(self.server_addr)
1869 cert = s.getpeercert()
1870 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001871
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001873 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001874 ctx.verify_mode = ssl.CERT_REQUIRED
1875 ctx.load_verify_locations(cadata=der)
1876 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1877 s.connect(self.server_addr)
1878 cert = s.getpeercert()
1879 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001880
Antoine Pitroue3220242010-04-24 11:13:53 +00001881 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1882 def test_makefile_close(self):
1883 # Issue #5238: creating a file-like object with makefile() shouldn't
1884 # delay closing the underlying "real socket" (here tested with its
1885 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001886 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001887 ss.connect(self.server_addr)
1888 fd = ss.fileno()
1889 f = ss.makefile()
1890 f.close()
1891 # The fd is still open
1892 os.read(fd, 0)
1893 # Closing the SSL socket should close the fd too
1894 ss.close()
1895 gc.collect()
1896 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001897 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001898 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001899
Antoine Pitrou480a1242010-04-28 21:37:09 +00001900 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001901 s = socket.socket(socket.AF_INET)
1902 s.connect(self.server_addr)
1903 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001904 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001905 cert_reqs=ssl.CERT_NONE,
1906 do_handshake_on_connect=False)
1907 self.addCleanup(s.close)
1908 count = 0
1909 while True:
1910 try:
1911 count += 1
1912 s.do_handshake()
1913 break
1914 except ssl.SSLWantReadError:
1915 select.select([s], [], [])
1916 except ssl.SSLWantWriteError:
1917 select.select([], [s], [])
1918 if support.verbose:
1919 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001920
Antoine Pitrou480a1242010-04-28 21:37:09 +00001921 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001922 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001923
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 def test_get_server_certificate_fail(self):
1925 # Connection failure crashes ThreadedEchoServer, so run this in an
1926 # independent test method
1927 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001928
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001929 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001930 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001931 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1932 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001933 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001934 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1935 s.connect(self.server_addr)
1936 # Error checking can happen at instantiation or when connecting
1937 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1938 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001939 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001940 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1941 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001942
Christian Heimes9a5395a2013-06-17 15:44:12 +02001943 def test_get_ca_certs_capath(self):
1944 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001945 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 ctx.load_verify_locations(capath=CAPATH)
1947 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001948 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1949 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001950 s.connect(self.server_addr)
1951 cert = s.getpeercert()
1952 self.assertTrue(cert)
1953 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001954
Christian Heimes575596e2013-12-15 21:49:17 +01001955 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001956 def test_context_setget(self):
1957 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001958 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1959 ctx1.load_verify_locations(capath=CAPATH)
1960 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1961 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001962 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001963 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001964 ss.connect(self.server_addr)
1965 self.assertIs(ss.context, ctx1)
1966 self.assertIs(ss._sslobj.context, ctx1)
1967 ss.context = ctx2
1968 self.assertIs(ss.context, ctx2)
1969 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001970
1971 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1972 # A simple IO loop. Call func(*args) depending on the error we get
1973 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1974 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001975 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001976 count = 0
1977 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001978 if time.monotonic() > deadline:
1979 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001980 errno = None
1981 count += 1
1982 try:
1983 ret = func(*args)
1984 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001985 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001986 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001987 raise
1988 errno = e.errno
1989 # Get any data from the outgoing BIO irrespective of any error, and
1990 # send it to the socket.
1991 buf = outgoing.read()
1992 sock.sendall(buf)
1993 # If there's no error, we're done. For WANT_READ, we need to get
1994 # data from the socket and put it in the incoming BIO.
1995 if errno is None:
1996 break
1997 elif errno == ssl.SSL_ERROR_WANT_READ:
1998 buf = sock.recv(32768)
1999 if buf:
2000 incoming.write(buf)
2001 else:
2002 incoming.write_eof()
2003 if support.verbose:
2004 sys.stdout.write("Needed %d calls to complete %s().\n"
2005 % (count, func.__name__))
2006 return ret
2007
Martin Panter3840b2a2016-03-27 01:53:46 +00002008 def test_bio_handshake(self):
2009 sock = socket.socket(socket.AF_INET)
2010 self.addCleanup(sock.close)
2011 sock.connect(self.server_addr)
2012 incoming = ssl.MemoryBIO()
2013 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002014 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2015 self.assertTrue(ctx.check_hostname)
2016 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002017 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002018 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2019 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002020 self.assertIs(sslobj._sslobj.owner, sslobj)
2021 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002022 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002023 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002024 self.assertRaises(ValueError, sslobj.getpeercert)
2025 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2026 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2027 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2028 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002029 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002030 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002031 self.assertTrue(sslobj.getpeercert())
2032 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2033 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2034 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002035 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002036 except ssl.SSLSyscallError:
2037 # If the server shuts down the TCP connection without sending a
2038 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2039 pass
2040 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2041
2042 def test_bio_read_write_data(self):
2043 sock = socket.socket(socket.AF_INET)
2044 self.addCleanup(sock.close)
2045 sock.connect(self.server_addr)
2046 incoming = ssl.MemoryBIO()
2047 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002049 ctx.verify_mode = ssl.CERT_NONE
2050 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2051 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2052 req = b'FOO\n'
2053 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2054 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2055 self.assertEqual(buf, b'foo\n')
2056 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002057
2058
Martin Panter3840b2a2016-03-27 01:53:46 +00002059class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002060
Martin Panter3840b2a2016-03-27 01:53:46 +00002061 def test_timeout_connect_ex(self):
2062 # Issue #12065: on a timeout, connect_ex() should return the original
2063 # errno (mimicking the behaviour of non-SSL sockets).
2064 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002065 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002066 cert_reqs=ssl.CERT_REQUIRED,
2067 do_handshake_on_connect=False)
2068 self.addCleanup(s.close)
2069 s.settimeout(0.0000001)
2070 rc = s.connect_ex((REMOTE_HOST, 443))
2071 if rc == 0:
2072 self.skipTest("REMOTE_HOST responded too quickly")
2073 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2074
2075 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2076 def test_get_server_certificate_ipv6(self):
2077 with support.transient_internet('ipv6.google.com'):
2078 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2079 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2080
Martin Panter3840b2a2016-03-27 01:53:46 +00002081
2082def _test_get_server_certificate(test, host, port, cert=None):
2083 pem = ssl.get_server_certificate((host, port))
2084 if not pem:
2085 test.fail("No server certificate on %s:%s!" % (host, port))
2086
2087 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2088 if not pem:
2089 test.fail("No server certificate on %s:%s!" % (host, port))
2090 if support.verbose:
2091 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2092
2093def _test_get_server_certificate_fail(test, host, port):
2094 try:
2095 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2096 except ssl.SSLError as x:
2097 #should fail
2098 if support.verbose:
2099 sys.stdout.write("%s\n" % x)
2100 else:
2101 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2102
2103
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002104from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002106class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002107
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002108 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002109
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002110 """A mildly complicated class, because we want it to work both
2111 with and without the SSL wrapper around the socket connection, so
2112 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002113
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002114 def __init__(self, server, connsock, addr):
2115 self.server = server
2116 self.running = False
2117 self.sock = connsock
2118 self.addr = addr
2119 self.sock.setblocking(1)
2120 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002121 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002122 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002123
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002124 def wrap_conn(self):
2125 try:
2126 self.sslconn = self.server.context.wrap_socket(
2127 self.sock, server_side=True)
2128 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2129 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002130 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002131 # We treat ConnectionResetError as though it were an
2132 # SSLError - OpenSSL on Ubuntu abruptly closes the
2133 # connection when asked to use an unsupported protocol.
2134 #
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002135 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2136 # tries to send session tickets after handshake.
2137 # https://github.com/openssl/openssl/issues/6342
2138 self.server.conn_errors.append(str(e))
2139 if self.server.chatty:
2140 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2141 self.running = False
2142 self.close()
2143 return False
2144 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002145 # OSError may occur with wrong protocols, e.g. both
2146 # sides use PROTOCOL_TLS_SERVER.
2147 #
2148 # XXX Various errors can have happened here, for example
2149 # a mismatching protocol version, an invalid certificate,
2150 # or a low-level bug. This should be made more discriminating.
2151 #
2152 # bpo-31323: Store the exception as string to prevent
2153 # a reference leak: server -> conn_errors -> exception
2154 # -> traceback -> self (ConnectionHandler) -> server
2155 self.server.conn_errors.append(str(e))
2156 if self.server.chatty:
2157 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2158 self.running = False
2159 self.server.stop()
2160 self.close()
2161 return False
2162 else:
2163 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2164 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2165 cert = self.sslconn.getpeercert()
2166 if support.verbose and self.server.chatty:
2167 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2168 cert_binary = self.sslconn.getpeercert(True)
2169 if support.verbose and self.server.chatty:
2170 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2171 cipher = self.sslconn.cipher()
2172 if support.verbose and self.server.chatty:
2173 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2174 sys.stdout.write(" server: selected protocol is now "
2175 + str(self.sslconn.selected_npn_protocol()) + "\n")
2176 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002177
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002178 def read(self):
2179 if self.sslconn:
2180 return self.sslconn.read()
2181 else:
2182 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002183
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002184 def write(self, bytes):
2185 if self.sslconn:
2186 return self.sslconn.write(bytes)
2187 else:
2188 return self.sock.send(bytes)
2189
2190 def close(self):
2191 if self.sslconn:
2192 self.sslconn.close()
2193 else:
2194 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002195
Antoine Pitrou480a1242010-04-28 21:37:09 +00002196 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002197 self.running = True
2198 if not self.server.starttls_server:
2199 if not self.wrap_conn():
2200 return
2201 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002202 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002203 msg = self.read()
2204 stripped = msg.strip()
2205 if not stripped:
2206 # eof, so quit this handler
2207 self.running = False
2208 try:
2209 self.sock = self.sslconn.unwrap()
2210 except OSError:
2211 # Many tests shut the TCP connection down
2212 # without an SSL shutdown. This causes
2213 # unwrap() to raise OSError with errno=0!
2214 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002215 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002216 self.sslconn = None
2217 self.close()
2218 elif stripped == b'over':
2219 if support.verbose and self.server.connectionchatty:
2220 sys.stdout.write(" server: client closed connection\n")
2221 self.close()
2222 return
2223 elif (self.server.starttls_server and
2224 stripped == b'STARTTLS'):
2225 if support.verbose and self.server.connectionchatty:
2226 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2227 self.write(b"OK\n")
2228 if not self.wrap_conn():
2229 return
2230 elif (self.server.starttls_server and self.sslconn
2231 and stripped == b'ENDTLS'):
2232 if support.verbose and self.server.connectionchatty:
2233 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2234 self.write(b"OK\n")
2235 self.sock = self.sslconn.unwrap()
2236 self.sslconn = None
2237 if support.verbose and self.server.connectionchatty:
2238 sys.stdout.write(" server: connection is now unencrypted...\n")
2239 elif stripped == b'CB tls-unique':
2240 if support.verbose and self.server.connectionchatty:
2241 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2242 data = self.sslconn.get_channel_binding("tls-unique")
2243 self.write(repr(data).encode("us-ascii") + b"\n")
2244 else:
2245 if (support.verbose and
2246 self.server.connectionchatty):
2247 ctype = (self.sslconn and "encrypted") or "unencrypted"
2248 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2249 % (msg, ctype, msg.lower(), ctype))
2250 self.write(msg.lower())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002251 except ConnectionResetError:
2252 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2253 # when connection is not shut down gracefully.
2254 if self.server.chatty and support.verbose:
2255 sys.stdout.write(
2256 " Connection reset by peer: {}\n".format(
2257 self.addr)
2258 )
2259 self.close()
2260 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002261 except OSError:
2262 if self.server.chatty:
2263 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002264 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002265 self.running = False
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002266
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002267 # normally, we'd just stop here, but for the test
2268 # harness, we want to stop the server
2269 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002270
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002271 def __init__(self, certificate=None, ssl_version=None,
2272 certreqs=None, cacerts=None,
2273 chatty=True, connectionchatty=False, starttls_server=False,
2274 npn_protocols=None, alpn_protocols=None,
2275 ciphers=None, context=None):
2276 if context:
2277 self.context = context
2278 else:
2279 self.context = ssl.SSLContext(ssl_version
2280 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002281 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002282 self.context.verify_mode = (certreqs if certreqs is not None
2283 else ssl.CERT_NONE)
2284 if cacerts:
2285 self.context.load_verify_locations(cacerts)
2286 if certificate:
2287 self.context.load_cert_chain(certificate)
2288 if npn_protocols:
2289 self.context.set_npn_protocols(npn_protocols)
2290 if alpn_protocols:
2291 self.context.set_alpn_protocols(alpn_protocols)
2292 if ciphers:
2293 self.context.set_ciphers(ciphers)
2294 self.chatty = chatty
2295 self.connectionchatty = connectionchatty
2296 self.starttls_server = starttls_server
2297 self.sock = socket.socket()
2298 self.port = support.bind_port(self.sock)
2299 self.flag = None
2300 self.active = False
2301 self.selected_npn_protocols = []
2302 self.selected_alpn_protocols = []
2303 self.shared_ciphers = []
2304 self.conn_errors = []
2305 threading.Thread.__init__(self)
2306 self.daemon = True
2307
2308 def __enter__(self):
2309 self.start(threading.Event())
2310 self.flag.wait()
2311 return self
2312
2313 def __exit__(self, *args):
2314 self.stop()
2315 self.join()
2316
2317 def start(self, flag=None):
2318 self.flag = flag
2319 threading.Thread.start(self)
2320
2321 def run(self):
2322 self.sock.settimeout(0.05)
2323 self.sock.listen()
2324 self.active = True
2325 if self.flag:
2326 # signal an event
2327 self.flag.set()
2328 while self.active:
2329 try:
2330 newconn, connaddr = self.sock.accept()
2331 if support.verbose and self.chatty:
2332 sys.stdout.write(' server: new connection from '
2333 + repr(connaddr) + '\n')
2334 handler = self.ConnectionHandler(self, newconn, connaddr)
2335 handler.start()
2336 handler.join()
2337 except socket.timeout:
2338 pass
2339 except KeyboardInterrupt:
2340 self.stop()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002341 except BaseException as e:
2342 if support.verbose and self.chatty:
2343 sys.stdout.write(
2344 ' connection handling failed: ' + repr(e) + '\n')
2345
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002346 self.sock.close()
2347
2348 def stop(self):
2349 self.active = False
2350
2351class AsyncoreEchoServer(threading.Thread):
2352
2353 # this one's based on asyncore.dispatcher
2354
2355 class EchoServer (asyncore.dispatcher):
2356
2357 class ConnectionHandler(asyncore.dispatcher_with_send):
2358
2359 def __init__(self, conn, certfile):
2360 self.socket = test_wrap_socket(conn, server_side=True,
2361 certfile=certfile,
2362 do_handshake_on_connect=False)
2363 asyncore.dispatcher_with_send.__init__(self, self.socket)
2364 self._ssl_accepting = True
2365 self._do_ssl_handshake()
2366
2367 def readable(self):
2368 if isinstance(self.socket, ssl.SSLSocket):
2369 while self.socket.pending() > 0:
2370 self.handle_read_event()
2371 return True
2372
2373 def _do_ssl_handshake(self):
2374 try:
2375 self.socket.do_handshake()
2376 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2377 return
2378 except ssl.SSLEOFError:
2379 return self.handle_close()
2380 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002381 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002382 except OSError as err:
2383 if err.args[0] == errno.ECONNABORTED:
2384 return self.handle_close()
2385 else:
2386 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002387
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002388 def handle_read(self):
2389 if self._ssl_accepting:
2390 self._do_ssl_handshake()
2391 else:
2392 data = self.recv(1024)
2393 if support.verbose:
2394 sys.stdout.write(" server: read %s from client\n" % repr(data))
2395 if not data:
2396 self.close()
2397 else:
2398 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002399
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002400 def handle_close(self):
2401 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002402 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002403 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002404
2405 def handle_error(self):
2406 raise
2407
Trent Nelson78520002008-04-10 20:54:35 +00002408 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002409 self.certfile = certfile
2410 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2411 self.port = support.bind_port(sock, '')
2412 asyncore.dispatcher.__init__(self, sock)
2413 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002414
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002415 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002416 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002417 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2418 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002419
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002420 def handle_error(self):
2421 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002422
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002423 def __init__(self, certfile):
2424 self.flag = None
2425 self.active = False
2426 self.server = self.EchoServer(certfile)
2427 self.port = self.server.port
2428 threading.Thread.__init__(self)
2429 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002430
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002431 def __str__(self):
2432 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002433
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002434 def __enter__(self):
2435 self.start(threading.Event())
2436 self.flag.wait()
2437 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002438
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002439 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002440 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 sys.stdout.write(" cleanup: stopping server.\n")
2442 self.stop()
2443 if support.verbose:
2444 sys.stdout.write(" cleanup: joining server thread.\n")
2445 self.join()
2446 if support.verbose:
2447 sys.stdout.write(" cleanup: successfully joined.\n")
2448 # make sure that ConnectionHandler is removed from socket_map
2449 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002450
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002451 def start (self, flag=None):
2452 self.flag = flag
2453 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002454
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002455 def run(self):
2456 self.active = True
2457 if self.flag:
2458 self.flag.set()
2459 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002460 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002461 asyncore.loop(1)
2462 except:
2463 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002464
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002465 def stop(self):
2466 self.active = False
2467 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002468
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002469def server_params_test(client_context, server_context, indata=b"FOO\n",
2470 chatty=True, connectionchatty=False, sni_name=None,
2471 session=None):
2472 """
2473 Launch a server, connect a client to it and try various reads
2474 and writes.
2475 """
2476 stats = {}
2477 server = ThreadedEchoServer(context=server_context,
2478 chatty=chatty,
2479 connectionchatty=False)
2480 with server:
2481 with client_context.wrap_socket(socket.socket(),
2482 server_hostname=sni_name, session=session) as s:
2483 s.connect((HOST, server.port))
2484 for arg in [indata, bytearray(indata), memoryview(indata)]:
2485 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002486 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002487 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002488 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002489 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002490 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002491 if connectionchatty:
2492 if support.verbose:
2493 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002494 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002495 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002496 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2497 % (outdata[:20], len(outdata),
2498 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002499 s.write(b"over\n")
2500 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002501 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002502 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002503 stats.update({
2504 'compression': s.compression(),
2505 'cipher': s.cipher(),
2506 'peercert': s.getpeercert(),
2507 'client_alpn_protocol': s.selected_alpn_protocol(),
2508 'client_npn_protocol': s.selected_npn_protocol(),
2509 'version': s.version(),
2510 'session_reused': s.session_reused,
2511 'session': s.session,
2512 })
2513 s.close()
2514 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2515 stats['server_npn_protocols'] = server.selected_npn_protocols
2516 stats['server_shared_ciphers'] = server.shared_ciphers
2517 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002518
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002519def try_protocol_combo(server_protocol, client_protocol, expect_success,
2520 certsreqs=None, server_options=0, client_options=0):
2521 """
2522 Try to SSL-connect using *client_protocol* to *server_protocol*.
2523 If *expect_success* is true, assert that the connection succeeds,
2524 if it's false, assert that the connection fails.
2525 Also, if *expect_success* is a string, assert that it is the protocol
2526 version actually used by the connection.
2527 """
2528 if certsreqs is None:
2529 certsreqs = ssl.CERT_NONE
2530 certtype = {
2531 ssl.CERT_NONE: "CERT_NONE",
2532 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2533 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2534 }[certsreqs]
2535 if support.verbose:
2536 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2537 sys.stdout.write(formatstr %
2538 (ssl.get_protocol_name(client_protocol),
2539 ssl.get_protocol_name(server_protocol),
2540 certtype))
2541 client_context = ssl.SSLContext(client_protocol)
2542 client_context.options |= client_options
2543 server_context = ssl.SSLContext(server_protocol)
2544 server_context.options |= server_options
2545
2546 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2547 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2548 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002549 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002550 client_context.set_ciphers("ALL")
2551
2552 for ctx in (client_context, server_context):
2553 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002554 ctx.load_cert_chain(SIGNED_CERTFILE)
2555 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002556 try:
2557 stats = server_params_test(client_context, server_context,
2558 chatty=False, connectionchatty=False)
2559 # Protocol mismatch can result in either an SSLError, or a
2560 # "Connection reset by peer" error.
2561 except ssl.SSLError:
2562 if expect_success:
2563 raise
2564 except OSError as e:
2565 if expect_success or e.errno != errno.ECONNRESET:
2566 raise
2567 else:
2568 if not expect_success:
2569 raise AssertionError(
2570 "Client protocol %s succeeded with server protocol %s!"
2571 % (ssl.get_protocol_name(client_protocol),
2572 ssl.get_protocol_name(server_protocol)))
2573 elif (expect_success is not True
2574 and expect_success != stats['version']):
2575 raise AssertionError("version mismatch: expected %r, got %r"
2576 % (expect_success, stats['version']))
2577
2578
2579class ThreadedTests(unittest.TestCase):
2580
2581 @skip_if_broken_ubuntu_ssl
2582 def test_echo(self):
2583 """Basic test of an SSL client connecting to a server"""
2584 if support.verbose:
2585 sys.stdout.write("\n")
2586 for protocol in PROTOCOLS:
2587 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2588 continue
2589 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2590 context = ssl.SSLContext(protocol)
2591 context.load_cert_chain(CERTFILE)
2592 server_params_test(context, context,
2593 chatty=True, connectionchatty=True)
2594
Christian Heimesa170fa12017-09-15 20:27:30 +02002595 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002596
2597 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2598 server_params_test(client_context=client_context,
2599 server_context=server_context,
2600 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002601 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602
2603 client_context.check_hostname = False
2604 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2605 with self.assertRaises(ssl.SSLError) as e:
2606 server_params_test(client_context=server_context,
2607 server_context=client_context,
2608 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002609 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002610 self.assertIn('called a function you should not call',
2611 str(e.exception))
2612
2613 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2614 with self.assertRaises(ssl.SSLError) as e:
2615 server_params_test(client_context=server_context,
2616 server_context=server_context,
2617 chatty=True, connectionchatty=True)
2618 self.assertIn('called a function you should not call',
2619 str(e.exception))
2620
2621 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2622 with self.assertRaises(ssl.SSLError) as e:
2623 server_params_test(client_context=server_context,
2624 server_context=client_context,
2625 chatty=True, connectionchatty=True)
2626 self.assertIn('called a function you should not call',
2627 str(e.exception))
2628
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002629 def test_getpeercert(self):
2630 if support.verbose:
2631 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002632
2633 client_context, server_context, hostname = testing_context()
2634 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002635 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002636 with client_context.wrap_socket(socket.socket(),
2637 do_handshake_on_connect=False,
2638 server_hostname=hostname) as s:
2639 s.connect((HOST, server.port))
2640 # getpeercert() raise ValueError while the handshake isn't
2641 # done.
2642 with self.assertRaises(ValueError):
2643 s.getpeercert()
2644 s.do_handshake()
2645 cert = s.getpeercert()
2646 self.assertTrue(cert, "Can't get peer certificate.")
2647 cipher = s.cipher()
2648 if support.verbose:
2649 sys.stdout.write(pprint.pformat(cert) + '\n')
2650 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2651 if 'subject' not in cert:
2652 self.fail("No subject field in certificate: %s." %
2653 pprint.pformat(cert))
2654 if ((('organizationName', 'Python Software Foundation'),)
2655 not in cert['subject']):
2656 self.fail(
2657 "Missing or invalid 'organizationName' field in certificate subject; "
2658 "should be 'Python Software Foundation'.")
2659 self.assertIn('notBefore', cert)
2660 self.assertIn('notAfter', cert)
2661 before = ssl.cert_time_to_seconds(cert['notBefore'])
2662 after = ssl.cert_time_to_seconds(cert['notAfter'])
2663 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002664
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002665 @unittest.skipUnless(have_verify_flags(),
2666 "verify_flags need OpenSSL > 0.9.8")
2667 def test_crl_check(self):
2668 if support.verbose:
2669 sys.stdout.write("\n")
2670
Christian Heimesa170fa12017-09-15 20:27:30 +02002671 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002673 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002674 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675
2676 # VERIFY_DEFAULT should pass
2677 server = ThreadedEchoServer(context=server_context, chatty=True)
2678 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002679 with client_context.wrap_socket(socket.socket(),
2680 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002681 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002682 cert = s.getpeercert()
2683 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002684
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002685 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002686 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002687
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002688 server = ThreadedEchoServer(context=server_context, chatty=True)
2689 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002690 with client_context.wrap_socket(socket.socket(),
2691 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002692 with self.assertRaisesRegex(ssl.SSLError,
2693 "certificate verify failed"):
2694 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002695
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002696 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002697 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002698
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002699 server = ThreadedEchoServer(context=server_context, chatty=True)
2700 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002701 with client_context.wrap_socket(socket.socket(),
2702 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002703 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002704 cert = s.getpeercert()
2705 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002706
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002707 def test_check_hostname(self):
2708 if support.verbose:
2709 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002710
Christian Heimesa170fa12017-09-15 20:27:30 +02002711 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002712
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713 # correct hostname should verify
2714 server = ThreadedEchoServer(context=server_context, chatty=True)
2715 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002716 with client_context.wrap_socket(socket.socket(),
2717 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002718 s.connect((HOST, server.port))
2719 cert = s.getpeercert()
2720 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002721
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002722 # incorrect hostname should raise an exception
2723 server = ThreadedEchoServer(context=server_context, chatty=True)
2724 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002725 with client_context.wrap_socket(socket.socket(),
2726 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002727 with self.assertRaisesRegex(
2728 ssl.CertificateError,
2729 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002730 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002731
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002732 # missing server_hostname arg should cause an exception, too
2733 server = ThreadedEchoServer(context=server_context, chatty=True)
2734 with server:
2735 with socket.socket() as s:
2736 with self.assertRaisesRegex(ValueError,
2737 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002738 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002739
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002740 def test_ecc_cert(self):
2741 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2742 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002743 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002744 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2745
2746 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2747 # load ECC cert
2748 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2749
2750 # correct hostname should verify
2751 server = ThreadedEchoServer(context=server_context, chatty=True)
2752 with server:
2753 with client_context.wrap_socket(socket.socket(),
2754 server_hostname=hostname) as s:
2755 s.connect((HOST, server.port))
2756 cert = s.getpeercert()
2757 self.assertTrue(cert, "Can't get peer certificate.")
2758 cipher = s.cipher()[0].split('-')
2759 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2760
2761 def test_dual_rsa_ecc(self):
2762 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2763 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002764 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2765 # algorithms.
2766 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002767 # only ECDSA certs
2768 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2769 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2770
2771 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2772 # load ECC and RSA key/cert pairs
2773 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2774 server_context.load_cert_chain(SIGNED_CERTFILE)
2775
2776 # correct hostname should verify
2777 server = ThreadedEchoServer(context=server_context, chatty=True)
2778 with server:
2779 with client_context.wrap_socket(socket.socket(),
2780 server_hostname=hostname) as s:
2781 s.connect((HOST, server.port))
2782 cert = s.getpeercert()
2783 self.assertTrue(cert, "Can't get peer certificate.")
2784 cipher = s.cipher()[0].split('-')
2785 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2786
Christian Heimes66e57422018-01-29 14:25:13 +01002787 def test_check_hostname_idn(self):
2788 if support.verbose:
2789 sys.stdout.write("\n")
2790
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002791 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002792 server_context.load_cert_chain(IDNSANSFILE)
2793
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002794 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002795 context.verify_mode = ssl.CERT_REQUIRED
2796 context.check_hostname = True
2797 context.load_verify_locations(SIGNING_CA)
2798
2799 # correct hostname should verify, when specified in several
2800 # different ways
2801 idn_hostnames = [
2802 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002803 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002804 ('xn--knig-5qa.idn.pythontest.net',
2805 'xn--knig-5qa.idn.pythontest.net'),
2806 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002807 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002808
2809 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002810 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002811 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2812 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2813 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002814 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2815
2816 # ('königsgäßchen.idna2008.pythontest.net',
2817 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2818 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2819 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2820 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2821 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2822
Christian Heimes66e57422018-01-29 14:25:13 +01002823 ]
2824 for server_hostname, expected_hostname in idn_hostnames:
2825 server = ThreadedEchoServer(context=server_context, chatty=True)
2826 with server:
2827 with context.wrap_socket(socket.socket(),
2828 server_hostname=server_hostname) as s:
2829 self.assertEqual(s.server_hostname, expected_hostname)
2830 s.connect((HOST, server.port))
2831 cert = s.getpeercert()
2832 self.assertEqual(s.server_hostname, expected_hostname)
2833 self.assertTrue(cert, "Can't get peer certificate.")
2834
Christian Heimes66e57422018-01-29 14:25:13 +01002835 # incorrect hostname should raise an exception
2836 server = ThreadedEchoServer(context=server_context, chatty=True)
2837 with server:
2838 with context.wrap_socket(socket.socket(),
2839 server_hostname="python.example.org") as s:
2840 with self.assertRaises(ssl.CertificateError):
2841 s.connect((HOST, server.port))
2842
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002843 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002844 """Connecting when the server rejects the client's certificate
2845
2846 Launch a server with CERT_REQUIRED, and check that trying to
2847 connect to it with a wrong client certificate fails.
2848 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002849 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002850 # load client cert that is not signed by trusted CA
2851 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002852 # require TLS client authentication
2853 server_context.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002854 # TLS 1.3 has different handshake
2855 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002856
2857 server = ThreadedEchoServer(
2858 context=server_context, chatty=True, connectionchatty=True,
2859 )
2860
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002861 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002862 client_context.wrap_socket(socket.socket(),
2863 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002864 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002865 # Expect either an SSL error about the server rejecting
2866 # the connection, or a low-level connection reset (which
2867 # sometimes happens on Windows)
2868 s.connect((HOST, server.port))
2869 except ssl.SSLError as e:
2870 if support.verbose:
2871 sys.stdout.write("\nSSLError is %r\n" % e)
2872 except OSError as e:
2873 if e.errno != errno.ECONNRESET:
2874 raise
2875 if support.verbose:
2876 sys.stdout.write("\nsocket.error is %r\n" % e)
2877 else:
2878 self.fail("Use of invalid cert should have failed!")
2879
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002880 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2881 def test_wrong_cert_tls13(self):
2882 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002883 # load client cert that is not signed by trusted CA
2884 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002885 server_context.verify_mode = ssl.CERT_REQUIRED
2886 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2887 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2888
2889 server = ThreadedEchoServer(
2890 context=server_context, chatty=True, connectionchatty=True,
2891 )
2892 with server, \
2893 client_context.wrap_socket(socket.socket(),
2894 server_hostname=hostname) as s:
2895 # TLS 1.3 perform client cert exchange after handshake
2896 s.connect((HOST, server.port))
2897 try:
2898 s.write(b'data')
2899 s.read(4)
2900 except ssl.SSLError as e:
2901 if support.verbose:
2902 sys.stdout.write("\nSSLError is %r\n" % e)
2903 except OSError as e:
2904 if e.errno != errno.ECONNRESET:
2905 raise
2906 if support.verbose:
2907 sys.stdout.write("\nsocket.error is %r\n" % e)
2908 else:
2909 self.fail("Use of invalid cert should have failed!")
2910
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002911 def test_rude_shutdown(self):
2912 """A brutal shutdown of an SSL server should raise an OSError
2913 in the client when attempting handshake.
2914 """
2915 listener_ready = threading.Event()
2916 listener_gone = threading.Event()
2917
2918 s = socket.socket()
2919 port = support.bind_port(s, HOST)
2920
2921 # `listener` runs in a thread. It sits in an accept() until
2922 # the main thread connects. Then it rudely closes the socket,
2923 # and sets Event `listener_gone` to let the main thread know
2924 # the socket is gone.
2925 def listener():
2926 s.listen()
2927 listener_ready.set()
2928 newsock, addr = s.accept()
2929 newsock.close()
2930 s.close()
2931 listener_gone.set()
2932
2933 def connector():
2934 listener_ready.wait()
2935 with socket.socket() as c:
2936 c.connect((HOST, port))
2937 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002938 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002939 ssl_sock = test_wrap_socket(c)
2940 except OSError:
2941 pass
2942 else:
2943 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002944
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002945 t = threading.Thread(target=listener)
2946 t.start()
2947 try:
2948 connector()
2949 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002950 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002951
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002952 def test_ssl_cert_verify_error(self):
2953 if support.verbose:
2954 sys.stdout.write("\n")
2955
2956 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2957 server_context.load_cert_chain(SIGNED_CERTFILE)
2958
2959 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2960
2961 server = ThreadedEchoServer(context=server_context, chatty=True)
2962 with server:
2963 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002964 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002965 try:
2966 s.connect((HOST, server.port))
2967 except ssl.SSLError as e:
2968 msg = 'unable to get local issuer certificate'
2969 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2970 self.assertEqual(e.verify_code, 20)
2971 self.assertEqual(e.verify_message, msg)
2972 self.assertIn(msg, repr(e))
2973 self.assertIn('certificate verify failed', repr(e))
2974
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002975 @skip_if_broken_ubuntu_ssl
2976 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2977 "OpenSSL is compiled without SSLv2 support")
2978 def test_protocol_sslv2(self):
2979 """Connecting to an SSLv2 server with various client options"""
2980 if support.verbose:
2981 sys.stdout.write("\n")
2982 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2983 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2984 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002985 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002986 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2987 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2988 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2989 # SSLv23 client with specific SSL options
2990 if no_sslv2_implies_sslv3_hello():
2991 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002992 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002993 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002994 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002995 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002996 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002997 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002998
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002999 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02003000 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003001 """Connecting to an SSLv23 server with various client options"""
3002 if support.verbose:
3003 sys.stdout.write("\n")
3004 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003005 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003006 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003007 except OSError as x:
3008 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3009 if support.verbose:
3010 sys.stdout.write(
3011 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3012 % str(x))
3013 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003014 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3015 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3016 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003017
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003018 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003019 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3020 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3021 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003022
3023 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003024 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3025 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3026 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003027
3028 # Server with specific SSL options
3029 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003030 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003031 server_options=ssl.OP_NO_SSLv3)
3032 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003033 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003034 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003035 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003036 server_options=ssl.OP_NO_TLSv1)
3037
3038
3039 @skip_if_broken_ubuntu_ssl
3040 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3041 "OpenSSL is compiled without SSLv3 support")
3042 def test_protocol_sslv3(self):
3043 """Connecting to an SSLv3 server with various client options"""
3044 if support.verbose:
3045 sys.stdout.write("\n")
3046 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3047 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3048 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3049 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3050 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003051 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003052 client_options=ssl.OP_NO_SSLv3)
3053 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3054 if no_sslv2_implies_sslv3_hello():
3055 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003056 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003057 False, client_options=ssl.OP_NO_SSLv2)
3058
3059 @skip_if_broken_ubuntu_ssl
3060 def test_protocol_tlsv1(self):
3061 """Connecting to a TLSv1 server with various client options"""
3062 if support.verbose:
3063 sys.stdout.write("\n")
3064 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3065 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3066 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3067 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3068 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3069 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3070 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003071 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003072 client_options=ssl.OP_NO_TLSv1)
3073
3074 @skip_if_broken_ubuntu_ssl
3075 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3076 "TLS version 1.1 not supported.")
3077 def test_protocol_tlsv1_1(self):
3078 """Connecting to a TLSv1.1 server with various client options.
3079 Testing against older TLS versions."""
3080 if support.verbose:
3081 sys.stdout.write("\n")
3082 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3083 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3084 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3085 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3086 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003087 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003088 client_options=ssl.OP_NO_TLSv1_1)
3089
Christian Heimesa170fa12017-09-15 20:27:30 +02003090 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003091 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3092 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3093
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003094 @skip_if_broken_ubuntu_ssl
3095 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3096 "TLS version 1.2 not supported.")
3097 def test_protocol_tlsv1_2(self):
3098 """Connecting to a TLSv1.2 server with various client options.
3099 Testing against older TLS versions."""
3100 if support.verbose:
3101 sys.stdout.write("\n")
3102 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3103 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3104 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3105 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3106 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3107 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3108 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003109 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003110 client_options=ssl.OP_NO_TLSv1_2)
3111
Christian Heimesa170fa12017-09-15 20:27:30 +02003112 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003113 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3114 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3115 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3116 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3117
3118 def test_starttls(self):
3119 """Switching from clear text to encrypted and back again."""
3120 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3121
3122 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123 starttls_server=True,
3124 chatty=True,
3125 connectionchatty=True)
3126 wrapped = False
3127 with server:
3128 s = socket.socket()
3129 s.setblocking(1)
3130 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003131 if support.verbose:
3132 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003133 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003134 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003135 sys.stdout.write(
3136 " client: sending %r...\n" % indata)
3137 if wrapped:
3138 conn.write(indata)
3139 outdata = conn.read()
3140 else:
3141 s.send(indata)
3142 outdata = s.recv(1024)
3143 msg = outdata.strip().lower()
3144 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3145 # STARTTLS ok, switch to secure mode
3146 if support.verbose:
3147 sys.stdout.write(
3148 " client: read %r from server, starting TLS...\n"
3149 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003150 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003151 wrapped = True
3152 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3153 # ENDTLS ok, switch back to clear text
3154 if support.verbose:
3155 sys.stdout.write(
3156 " client: read %r from server, ending TLS...\n"
3157 % msg)
3158 s = conn.unwrap()
3159 wrapped = False
3160 else:
3161 if support.verbose:
3162 sys.stdout.write(
3163 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003164 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003165 sys.stdout.write(" client: closing connection.\n")
3166 if wrapped:
3167 conn.write(b"over\n")
3168 else:
3169 s.send(b"over\n")
3170 if wrapped:
3171 conn.close()
3172 else:
3173 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003174
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003175 def test_socketserver(self):
3176 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003177 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003178 # try to connect
3179 if support.verbose:
3180 sys.stdout.write('\n')
3181 with open(CERTFILE, 'rb') as f:
3182 d1 = f.read()
3183 d2 = ''
3184 # now fetch the same data from the HTTPS server
3185 url = 'https://localhost:%d/%s' % (
3186 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003187 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003188 f = urllib.request.urlopen(url, context=context)
3189 try:
3190 dlen = f.info().get("content-length")
3191 if dlen and (int(dlen) > 0):
3192 d2 = f.read(int(dlen))
3193 if support.verbose:
3194 sys.stdout.write(
3195 " client: read %d bytes from remote server '%s'\n"
3196 % (len(d2), server))
3197 finally:
3198 f.close()
3199 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003200
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003201 def test_asyncore_server(self):
3202 """Check the example asyncore integration."""
3203 if support.verbose:
3204 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003205
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003206 indata = b"FOO\n"
3207 server = AsyncoreEchoServer(CERTFILE)
3208 with server:
3209 s = test_wrap_socket(socket.socket())
3210 s.connect(('127.0.0.1', server.port))
3211 if support.verbose:
3212 sys.stdout.write(
3213 " client: sending %r...\n" % indata)
3214 s.write(indata)
3215 outdata = s.read()
3216 if support.verbose:
3217 sys.stdout.write(" client: read %r\n" % outdata)
3218 if outdata != indata.lower():
3219 self.fail(
3220 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3221 % (outdata[:20], len(outdata),
3222 indata[:20].lower(), len(indata)))
3223 s.write(b"over\n")
3224 if support.verbose:
3225 sys.stdout.write(" client: closing connection.\n")
3226 s.close()
3227 if support.verbose:
3228 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003229
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003230 def test_recv_send(self):
3231 """Test recv(), send() and friends."""
3232 if support.verbose:
3233 sys.stdout.write("\n")
3234
3235 server = ThreadedEchoServer(CERTFILE,
3236 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003237 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003238 cacerts=CERTFILE,
3239 chatty=True,
3240 connectionchatty=False)
3241 with server:
3242 s = test_wrap_socket(socket.socket(),
3243 server_side=False,
3244 certfile=CERTFILE,
3245 ca_certs=CERTFILE,
3246 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003247 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003248 s.connect((HOST, server.port))
3249 # helper methods for standardising recv* method signatures
3250 def _recv_into():
3251 b = bytearray(b"\0"*100)
3252 count = s.recv_into(b)
3253 return b[:count]
3254
3255 def _recvfrom_into():
3256 b = bytearray(b"\0"*100)
3257 count, addr = s.recvfrom_into(b)
3258 return b[:count]
3259
3260 # (name, method, expect success?, *args, return value func)
3261 send_methods = [
3262 ('send', s.send, True, [], len),
3263 ('sendto', s.sendto, False, ["some.address"], len),
3264 ('sendall', s.sendall, True, [], lambda x: None),
3265 ]
3266 # (name, method, whether to expect success, *args)
3267 recv_methods = [
3268 ('recv', s.recv, True, []),
3269 ('recvfrom', s.recvfrom, False, ["some.address"]),
3270 ('recv_into', _recv_into, True, []),
3271 ('recvfrom_into', _recvfrom_into, False, []),
3272 ]
3273 data_prefix = "PREFIX_"
3274
3275 for (meth_name, send_meth, expect_success, args,
3276 ret_val_meth) in send_methods:
3277 indata = (data_prefix + meth_name).encode('ascii')
3278 try:
3279 ret = send_meth(indata, *args)
3280 msg = "sending with {}".format(meth_name)
3281 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3282 outdata = s.read()
3283 if outdata != indata.lower():
3284 self.fail(
3285 "While sending with <<{name:s}>> bad data "
3286 "<<{outdata:r}>> ({nout:d}) received; "
3287 "expected <<{indata:r}>> ({nin:d})\n".format(
3288 name=meth_name, outdata=outdata[:20],
3289 nout=len(outdata),
3290 indata=indata[:20], nin=len(indata)
3291 )
3292 )
3293 except ValueError as e:
3294 if expect_success:
3295 self.fail(
3296 "Failed to send with method <<{name:s}>>; "
3297 "expected to succeed.\n".format(name=meth_name)
3298 )
3299 if not str(e).startswith(meth_name):
3300 self.fail(
3301 "Method <<{name:s}>> failed with unexpected "
3302 "exception message: {exp:s}\n".format(
3303 name=meth_name, exp=e
3304 )
3305 )
3306
3307 for meth_name, recv_meth, expect_success, args in recv_methods:
3308 indata = (data_prefix + meth_name).encode('ascii')
3309 try:
3310 s.send(indata)
3311 outdata = recv_meth(*args)
3312 if outdata != indata.lower():
3313 self.fail(
3314 "While receiving with <<{name:s}>> bad data "
3315 "<<{outdata:r}>> ({nout:d}) received; "
3316 "expected <<{indata:r}>> ({nin:d})\n".format(
3317 name=meth_name, outdata=outdata[:20],
3318 nout=len(outdata),
3319 indata=indata[:20], nin=len(indata)
3320 )
3321 )
3322 except ValueError as e:
3323 if expect_success:
3324 self.fail(
3325 "Failed to receive with method <<{name:s}>>; "
3326 "expected to succeed.\n".format(name=meth_name)
3327 )
3328 if not str(e).startswith(meth_name):
3329 self.fail(
3330 "Method <<{name:s}>> failed with unexpected "
3331 "exception message: {exp:s}\n".format(
3332 name=meth_name, exp=e
3333 )
3334 )
3335 # consume data
3336 s.read()
3337
3338 # read(-1, buffer) is supported, even though read(-1) is not
3339 data = b"data"
3340 s.send(data)
3341 buffer = bytearray(len(data))
3342 self.assertEqual(s.read(-1, buffer), len(data))
3343 self.assertEqual(buffer, data)
3344
Christian Heimes888bbdc2017-09-07 14:18:21 -07003345 # sendall accepts bytes-like objects
3346 if ctypes is not None:
3347 ubyte = ctypes.c_ubyte * len(data)
3348 byteslike = ubyte.from_buffer_copy(data)
3349 s.sendall(byteslike)
3350 self.assertEqual(s.read(), data)
3351
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003352 # Make sure sendmsg et al are disallowed to avoid
3353 # inadvertent disclosure of data and/or corruption
3354 # of the encrypted data stream
3355 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3356 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3357 self.assertRaises(NotImplementedError,
3358 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003359 s.write(b"over\n")
3360
3361 self.assertRaises(ValueError, s.recv, -1)
3362 self.assertRaises(ValueError, s.read, -1)
3363
3364 s.close()
3365
3366 def test_recv_zero(self):
3367 server = ThreadedEchoServer(CERTFILE)
3368 server.__enter__()
3369 self.addCleanup(server.__exit__, None, None)
3370 s = socket.create_connection((HOST, server.port))
3371 self.addCleanup(s.close)
3372 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3373 self.addCleanup(s.close)
3374
3375 # recv/read(0) should return no data
3376 s.send(b"data")
3377 self.assertEqual(s.recv(0), b"")
3378 self.assertEqual(s.read(0), b"")
3379 self.assertEqual(s.read(), b"data")
3380
3381 # Should not block if the other end sends no data
3382 s.setblocking(False)
3383 self.assertEqual(s.recv(0), b"")
3384 self.assertEqual(s.recv_into(bytearray()), 0)
3385
3386 def test_nonblocking_send(self):
3387 server = ThreadedEchoServer(CERTFILE,
3388 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003389 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003390 cacerts=CERTFILE,
3391 chatty=True,
3392 connectionchatty=False)
3393 with server:
3394 s = test_wrap_socket(socket.socket(),
3395 server_side=False,
3396 certfile=CERTFILE,
3397 ca_certs=CERTFILE,
3398 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003399 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003400 s.connect((HOST, server.port))
3401 s.setblocking(False)
3402
3403 # If we keep sending data, at some point the buffers
3404 # will be full and the call will block
3405 buf = bytearray(8192)
3406 def fill_buffer():
3407 while True:
3408 s.send(buf)
3409 self.assertRaises((ssl.SSLWantWriteError,
3410 ssl.SSLWantReadError), fill_buffer)
3411
3412 # Now read all the output and discard it
3413 s.setblocking(True)
3414 s.close()
3415
3416 def test_handshake_timeout(self):
3417 # Issue #5103: SSL handshake must respect the socket timeout
3418 server = socket.socket(socket.AF_INET)
3419 host = "127.0.0.1"
3420 port = support.bind_port(server)
3421 started = threading.Event()
3422 finish = False
3423
3424 def serve():
3425 server.listen()
3426 started.set()
3427 conns = []
3428 while not finish:
3429 r, w, e = select.select([server], [], [], 0.1)
3430 if server in r:
3431 # Let the socket hang around rather than having
3432 # it closed by garbage collection.
3433 conns.append(server.accept()[0])
3434 for sock in conns:
3435 sock.close()
3436
3437 t = threading.Thread(target=serve)
3438 t.start()
3439 started.wait()
3440
3441 try:
3442 try:
3443 c = socket.socket(socket.AF_INET)
3444 c.settimeout(0.2)
3445 c.connect((host, port))
3446 # Will attempt handshake and time out
3447 self.assertRaisesRegex(socket.timeout, "timed out",
3448 test_wrap_socket, c)
3449 finally:
3450 c.close()
3451 try:
3452 c = socket.socket(socket.AF_INET)
3453 c = test_wrap_socket(c)
3454 c.settimeout(0.2)
3455 # Will attempt handshake and time out
3456 self.assertRaisesRegex(socket.timeout, "timed out",
3457 c.connect, (host, port))
3458 finally:
3459 c.close()
3460 finally:
3461 finish = True
3462 t.join()
3463 server.close()
3464
3465 def test_server_accept(self):
3466 # Issue #16357: accept() on a SSLSocket created through
3467 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003468 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003469 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003470 context.load_verify_locations(SIGNING_CA)
3471 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003472 server = socket.socket(socket.AF_INET)
3473 host = "127.0.0.1"
3474 port = support.bind_port(server)
3475 server = context.wrap_socket(server, server_side=True)
3476 self.assertTrue(server.server_side)
3477
3478 evt = threading.Event()
3479 remote = None
3480 peer = None
3481 def serve():
3482 nonlocal remote, peer
3483 server.listen()
3484 # Block on the accept and wait on the connection to close.
3485 evt.set()
3486 remote, peer = server.accept()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003487 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003488
3489 t = threading.Thread(target=serve)
3490 t.start()
3491 # Client wait until server setup and perform a connect.
3492 evt.wait()
3493 client = context.wrap_socket(socket.socket())
3494 client.connect((host, port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003495 client.send(b'data')
3496 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003497 client_addr = client.getsockname()
3498 client.close()
3499 t.join()
3500 remote.close()
3501 server.close()
3502 # Sanity checks.
3503 self.assertIsInstance(remote, ssl.SSLSocket)
3504 self.assertEqual(peer, client_addr)
3505
3506 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003507 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003508 with context.wrap_socket(socket.socket()) as sock:
3509 with self.assertRaises(OSError) as cm:
3510 sock.getpeercert()
3511 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3512
3513 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003514 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003515 with context.wrap_socket(socket.socket()) as sock:
3516 with self.assertRaises(OSError) as cm:
3517 sock.do_handshake()
3518 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3519
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003520 def test_no_shared_ciphers(self):
3521 client_context, server_context, hostname = testing_context()
3522 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3523 client_context.options |= ssl.OP_NO_TLSv1_3
3524 # Force different suites on client and master
3525 client_context.set_ciphers("AES128")
3526 server_context.set_ciphers("AES256")
3527 with ThreadedEchoServer(context=server_context) as server:
3528 with client_context.wrap_socket(socket.socket(),
3529 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003530 with self.assertRaises(OSError):
3531 s.connect((HOST, server.port))
3532 self.assertIn("no shared cipher", server.conn_errors[0])
3533
3534 def test_version_basic(self):
3535 """
3536 Basic tests for SSLSocket.version().
3537 More tests are done in the test_protocol_*() methods.
3538 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003539 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3540 context.check_hostname = False
3541 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003542 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003543 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003544 chatty=False) as server:
3545 with context.wrap_socket(socket.socket()) as s:
3546 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003547 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003548 s.connect((HOST, server.port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003549 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003550 self.assertEqual(s.version(), 'TLSv1.3')
3551 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003552 self.assertEqual(s.version(), 'TLSv1.2')
3553 else: # 0.9.8 to 1.0.1
3554 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003555 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003556 self.assertIs(s.version(), None)
3557
Christian Heimescb5b68a2017-09-07 18:07:00 -07003558 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3559 "test requires TLSv1.3 enabled OpenSSL")
3560 def test_tls1_3(self):
3561 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3562 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003563 context.options |= (
3564 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3565 )
3566 with ThreadedEchoServer(context=context) as server:
3567 with context.wrap_socket(socket.socket()) as s:
3568 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003569 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003570 'TLS_AES_256_GCM_SHA384',
3571 'TLS_CHACHA20_POLY1305_SHA256',
3572 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003573 })
3574 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003575
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003576 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3577 "required OpenSSL 1.1.0g")
3578 def test_min_max_version(self):
3579 client_context, server_context, hostname = testing_context()
3580 # client TLSv1.0 to 1.2
3581 client_context.minimum_version = ssl.TLSVersion.TLSv1
3582 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3583 # server only TLSv1.2
3584 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3585 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3586
3587 with ThreadedEchoServer(context=server_context) as server:
3588 with client_context.wrap_socket(socket.socket(),
3589 server_hostname=hostname) as s:
3590 s.connect((HOST, server.port))
3591 self.assertEqual(s.version(), 'TLSv1.2')
3592
3593 # client 1.0 to 1.2, server 1.0 to 1.1
3594 server_context.minimum_version = ssl.TLSVersion.TLSv1
3595 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3596
3597 with ThreadedEchoServer(context=server_context) as server:
3598 with client_context.wrap_socket(socket.socket(),
3599 server_hostname=hostname) as s:
3600 s.connect((HOST, server.port))
3601 self.assertEqual(s.version(), 'TLSv1.1')
3602
3603 # client 1.0, server 1.2 (mismatch)
3604 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3605 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3606 client_context.minimum_version = ssl.TLSVersion.TLSv1
3607 client_context.maximum_version = ssl.TLSVersion.TLSv1
3608 with ThreadedEchoServer(context=server_context) as server:
3609 with client_context.wrap_socket(socket.socket(),
3610 server_hostname=hostname) as s:
3611 with self.assertRaises(ssl.SSLError) as e:
3612 s.connect((HOST, server.port))
3613 self.assertIn("alert", str(e.exception))
3614
3615
3616 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3617 "required OpenSSL 1.1.0g")
3618 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3619 def test_min_max_version_sslv3(self):
3620 client_context, server_context, hostname = testing_context()
3621 server_context.minimum_version = ssl.TLSVersion.SSLv3
3622 client_context.minimum_version = ssl.TLSVersion.SSLv3
3623 client_context.maximum_version = ssl.TLSVersion.SSLv3
3624 with ThreadedEchoServer(context=server_context) as server:
3625 with client_context.wrap_socket(socket.socket(),
3626 server_hostname=hostname) as s:
3627 s.connect((HOST, server.port))
3628 self.assertEqual(s.version(), 'SSLv3')
3629
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003630 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3631 def test_default_ecdh_curve(self):
3632 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3633 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003634 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003635 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003636 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3637 # cipher name.
3638 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003639 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3640 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3641 # our default cipher list should prefer ECDH-based ciphers
3642 # automatically.
3643 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3644 context.set_ciphers("ECCdraft:ECDH")
3645 with ThreadedEchoServer(context=context) as server:
3646 with context.wrap_socket(socket.socket()) as s:
3647 s.connect((HOST, server.port))
3648 self.assertIn("ECDH", s.cipher()[0])
3649
3650 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3651 "'tls-unique' channel binding not available")
3652 def test_tls_unique_channel_binding(self):
3653 """Test tls-unique channel binding."""
3654 if support.verbose:
3655 sys.stdout.write("\n")
3656
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003657 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003658
3659 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003660 chatty=True,
3661 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003662
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003663 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003664 with client_context.wrap_socket(
3665 socket.socket(),
3666 server_hostname=hostname) as s:
3667 s.connect((HOST, server.port))
3668 # get the data
3669 cb_data = s.get_channel_binding("tls-unique")
3670 if support.verbose:
3671 sys.stdout.write(
3672 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003673
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003674 # check if it is sane
3675 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003676 if s.version() == 'TLSv1.3':
3677 self.assertEqual(len(cb_data), 48)
3678 else:
3679 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003680
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003681 # and compare with the peers version
3682 s.write(b"CB tls-unique\n")
3683 peer_data_repr = s.read().strip()
3684 self.assertEqual(peer_data_repr,
3685 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003686
3687 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003688 with client_context.wrap_socket(
3689 socket.socket(),
3690 server_hostname=hostname) as s:
3691 s.connect((HOST, server.port))
3692 new_cb_data = s.get_channel_binding("tls-unique")
3693 if support.verbose:
3694 sys.stdout.write(
3695 "got another channel binding data: {0!r}\n".format(
3696 new_cb_data)
3697 )
3698 # is it really unique
3699 self.assertNotEqual(cb_data, new_cb_data)
3700 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003701 if s.version() == 'TLSv1.3':
3702 self.assertEqual(len(cb_data), 48)
3703 else:
3704 self.assertEqual(len(cb_data), 12) # True for TLSv1
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003705 s.write(b"CB tls-unique\n")
3706 peer_data_repr = s.read().strip()
3707 self.assertEqual(peer_data_repr,
3708 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003709
3710 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003711 client_context, server_context, hostname = testing_context()
3712 stats = server_params_test(client_context, server_context,
3713 chatty=True, connectionchatty=True,
3714 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003715 if support.verbose:
3716 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3717 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3718
3719 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3720 "ssl.OP_NO_COMPRESSION needed for this test")
3721 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003722 client_context, server_context, hostname = testing_context()
3723 client_context.options |= ssl.OP_NO_COMPRESSION
3724 server_context.options |= ssl.OP_NO_COMPRESSION
3725 stats = server_params_test(client_context, server_context,
3726 chatty=True, connectionchatty=True,
3727 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003728 self.assertIs(stats['compression'], None)
3729
3730 def test_dh_params(self):
3731 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003732 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003733 # test scenario needs TLS <= 1.2
3734 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003735 server_context.load_dh_params(DHFILE)
3736 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003737 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003738 stats = server_params_test(client_context, server_context,
3739 chatty=True, connectionchatty=True,
3740 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003741 cipher = stats["cipher"][0]
3742 parts = cipher.split("-")
3743 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3744 self.fail("Non-DH cipher: " + cipher[0])
3745
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003746 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003747 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003748 def test_ecdh_curve(self):
3749 # server secp384r1, client auto
3750 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003751
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003752 server_context.set_ecdh_curve("secp384r1")
3753 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3754 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3755 stats = server_params_test(client_context, server_context,
3756 chatty=True, connectionchatty=True,
3757 sni_name=hostname)
3758
3759 # server auto, client secp384r1
3760 client_context, server_context, hostname = testing_context()
3761 client_context.set_ecdh_curve("secp384r1")
3762 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3763 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3764 stats = server_params_test(client_context, server_context,
3765 chatty=True, connectionchatty=True,
3766 sni_name=hostname)
3767
3768 # server / client curve mismatch
3769 client_context, server_context, hostname = testing_context()
3770 client_context.set_ecdh_curve("prime256v1")
3771 server_context.set_ecdh_curve("secp384r1")
3772 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3773 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3774 try:
3775 stats = server_params_test(client_context, server_context,
3776 chatty=True, connectionchatty=True,
3777 sni_name=hostname)
3778 except ssl.SSLError:
3779 pass
3780 else:
3781 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003782 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003783 self.fail("mismatch curve did not fail")
3784
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003785 def test_selected_alpn_protocol(self):
3786 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003787 client_context, server_context, hostname = testing_context()
3788 stats = server_params_test(client_context, server_context,
3789 chatty=True, connectionchatty=True,
3790 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003791 self.assertIs(stats['client_alpn_protocol'], None)
3792
3793 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3794 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3795 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003796 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003797 server_context.set_alpn_protocols(['foo', 'bar'])
3798 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003799 chatty=True, connectionchatty=True,
3800 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003801 self.assertIs(stats['client_alpn_protocol'], None)
3802
3803 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3804 def test_alpn_protocols(self):
3805 server_protocols = ['foo', 'bar', 'milkshake']
3806 protocol_tests = [
3807 (['foo', 'bar'], 'foo'),
3808 (['bar', 'foo'], 'foo'),
3809 (['milkshake'], 'milkshake'),
3810 (['http/3.0', 'http/4.0'], None)
3811 ]
3812 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003813 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003814 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003815 client_context.set_alpn_protocols(client_protocols)
3816
3817 try:
3818 stats = server_params_test(client_context,
3819 server_context,
3820 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003821 connectionchatty=True,
3822 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003823 except ssl.SSLError as e:
3824 stats = e
3825
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003826 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003827 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3828 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3829 self.assertIsInstance(stats, ssl.SSLError)
3830 else:
3831 msg = "failed trying %s (s) and %s (c).\n" \
3832 "was expecting %s, but got %%s from the %%s" \
3833 % (str(server_protocols), str(client_protocols),
3834 str(expected))
3835 client_result = stats['client_alpn_protocol']
3836 self.assertEqual(client_result, expected,
3837 msg % (client_result, "client"))
3838 server_result = stats['server_alpn_protocols'][-1] \
3839 if len(stats['server_alpn_protocols']) else 'nothing'
3840 self.assertEqual(server_result, expected,
3841 msg % (server_result, "server"))
3842
3843 def test_selected_npn_protocol(self):
3844 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003845 client_context, server_context, hostname = testing_context()
3846 stats = server_params_test(client_context, server_context,
3847 chatty=True, connectionchatty=True,
3848 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003849 self.assertIs(stats['client_npn_protocol'], None)
3850
3851 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3852 def test_npn_protocols(self):
3853 server_protocols = ['http/1.1', 'spdy/2']
3854 protocol_tests = [
3855 (['http/1.1', 'spdy/2'], 'http/1.1'),
3856 (['spdy/2', 'http/1.1'], 'http/1.1'),
3857 (['spdy/2', 'test'], 'spdy/2'),
3858 (['abc', 'def'], 'abc')
3859 ]
3860 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003861 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003862 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003863 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003864 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003865 chatty=True, connectionchatty=True,
3866 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003867 msg = "failed trying %s (s) and %s (c).\n" \
3868 "was expecting %s, but got %%s from the %%s" \
3869 % (str(server_protocols), str(client_protocols),
3870 str(expected))
3871 client_result = stats['client_npn_protocol']
3872 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3873 server_result = stats['server_npn_protocols'][-1] \
3874 if len(stats['server_npn_protocols']) else 'nothing'
3875 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3876
3877 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003878 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003879 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003880 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003881 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003882 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003883 client_context.load_verify_locations(SIGNING_CA)
3884 return server_context, other_context, client_context
3885
3886 def check_common_name(self, stats, name):
3887 cert = stats['peercert']
3888 self.assertIn((('commonName', name),), cert['subject'])
3889
3890 @needs_sni
3891 def test_sni_callback(self):
3892 calls = []
3893 server_context, other_context, client_context = self.sni_contexts()
3894
Christian Heimesa170fa12017-09-15 20:27:30 +02003895 client_context.check_hostname = False
3896
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003897 def servername_cb(ssl_sock, server_name, initial_context):
3898 calls.append((server_name, initial_context))
3899 if server_name is not None:
3900 ssl_sock.context = other_context
3901 server_context.set_servername_callback(servername_cb)
3902
3903 stats = server_params_test(client_context, server_context,
3904 chatty=True,
3905 sni_name='supermessage')
3906 # The hostname was fetched properly, and the certificate was
3907 # changed for the connection.
3908 self.assertEqual(calls, [("supermessage", server_context)])
3909 # CERTFILE4 was selected
3910 self.check_common_name(stats, 'fakehostname')
3911
3912 calls = []
3913 # The callback is called with server_name=None
3914 stats = server_params_test(client_context, server_context,
3915 chatty=True,
3916 sni_name=None)
3917 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003918 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003919
3920 # Check disabling the callback
3921 calls = []
3922 server_context.set_servername_callback(None)
3923
3924 stats = server_params_test(client_context, server_context,
3925 chatty=True,
3926 sni_name='notfunny')
3927 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003928 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003929 self.assertEqual(calls, [])
3930
3931 @needs_sni
3932 def test_sni_callback_alert(self):
3933 # Returning a TLS alert is reflected to the connecting client
3934 server_context, other_context, client_context = self.sni_contexts()
3935
3936 def cb_returning_alert(ssl_sock, server_name, initial_context):
3937 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3938 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939 with self.assertRaises(ssl.SSLError) as cm:
3940 stats = server_params_test(client_context, server_context,
3941 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003942 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003943 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003944
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003945 @needs_sni
3946 def test_sni_callback_raising(self):
3947 # Raising fails the connection with a TLS handshake failure alert.
3948 server_context, other_context, client_context = self.sni_contexts()
3949
3950 def cb_raising(ssl_sock, server_name, initial_context):
3951 1/0
3952 server_context.set_servername_callback(cb_raising)
3953
3954 with self.assertRaises(ssl.SSLError) as cm, \
3955 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003956 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003957 chatty=False,
3958 sni_name='supermessage')
3959 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3960 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003961
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003962 @needs_sni
3963 def test_sni_callback_wrong_return_type(self):
3964 # Returning the wrong return type terminates the TLS connection
3965 # with an internal error alert.
3966 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003967
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003968 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3969 return "foo"
3970 server_context.set_servername_callback(cb_wrong_return_type)
3971
3972 with self.assertRaises(ssl.SSLError) as cm, \
3973 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003974 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003975 chatty=False,
3976 sni_name='supermessage')
3977 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3978 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003979
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003980 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003981 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003982 client_context.set_ciphers("AES128:AES256")
3983 server_context.set_ciphers("AES256")
3984 expected_algs = [
3985 "AES256", "AES-256",
3986 # TLS 1.3 ciphers are always enabled
3987 "TLS_CHACHA20", "TLS_AES",
3988 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003989
Christian Heimesa170fa12017-09-15 20:27:30 +02003990 stats = server_params_test(client_context, server_context,
3991 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003992 ciphers = stats['server_shared_ciphers'][0]
3993 self.assertGreater(len(ciphers), 0)
3994 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003995 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003996 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003997
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003998 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003999 client_context, server_context, hostname = testing_context()
4000 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004001
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004002 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004003 s = client_context.wrap_socket(socket.socket(),
4004 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004005 s.connect((HOST, server.port))
4006 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004007
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004008 self.assertRaises(ValueError, s.read, 1024)
4009 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004010
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004011 def test_sendfile(self):
4012 TEST_DATA = b"x" * 512
4013 with open(support.TESTFN, 'wb') as f:
4014 f.write(TEST_DATA)
4015 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004016 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004018 context.load_verify_locations(SIGNING_CA)
4019 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004020 server = ThreadedEchoServer(context=context, chatty=False)
4021 with server:
4022 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004023 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004024 with open(support.TESTFN, 'rb') as file:
4025 s.sendfile(file)
4026 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004027
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004028 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004029 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004030 # TODO: sessions aren't compatible with TLSv1.3 yet
4031 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004033 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004034 stats = server_params_test(client_context, server_context,
4035 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004036 session = stats['session']
4037 self.assertTrue(session.id)
4038 self.assertGreater(session.time, 0)
4039 self.assertGreater(session.timeout, 0)
4040 self.assertTrue(session.has_ticket)
4041 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4042 self.assertGreater(session.ticket_lifetime_hint, 0)
4043 self.assertFalse(stats['session_reused'])
4044 sess_stat = server_context.session_stats()
4045 self.assertEqual(sess_stat['accept'], 1)
4046 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004047
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004048 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004049 stats = server_params_test(client_context, server_context,
4050 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004051 sess_stat = server_context.session_stats()
4052 self.assertEqual(sess_stat['accept'], 2)
4053 self.assertEqual(sess_stat['hits'], 1)
4054 self.assertTrue(stats['session_reused'])
4055 session2 = stats['session']
4056 self.assertEqual(session2.id, session.id)
4057 self.assertEqual(session2, session)
4058 self.assertIsNot(session2, session)
4059 self.assertGreaterEqual(session2.time, session.time)
4060 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004061
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004062 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004063 stats = server_params_test(client_context, server_context,
4064 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004065 self.assertFalse(stats['session_reused'])
4066 session3 = stats['session']
4067 self.assertNotEqual(session3.id, session.id)
4068 self.assertNotEqual(session3, session)
4069 sess_stat = server_context.session_stats()
4070 self.assertEqual(sess_stat['accept'], 3)
4071 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004072
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004073 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004074 stats = server_params_test(client_context, server_context,
4075 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004076 self.assertTrue(stats['session_reused'])
4077 session4 = stats['session']
4078 self.assertEqual(session4.id, session.id)
4079 self.assertEqual(session4, session)
4080 self.assertGreaterEqual(session4.time, session.time)
4081 self.assertGreaterEqual(session4.timeout, session.timeout)
4082 sess_stat = server_context.session_stats()
4083 self.assertEqual(sess_stat['accept'], 4)
4084 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004086 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004087 client_context, server_context, hostname = testing_context()
4088 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004089
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004090 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004091 client_context.options |= ssl.OP_NO_TLSv1_3
4092 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004093
Christian Heimesa170fa12017-09-15 20:27:30 +02004094 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004095 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004096 with client_context.wrap_socket(socket.socket(),
4097 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004098 # session is None before handshake
4099 self.assertEqual(s.session, None)
4100 self.assertEqual(s.session_reused, None)
4101 s.connect((HOST, server.port))
4102 session = s.session
4103 self.assertTrue(session)
4104 with self.assertRaises(TypeError) as e:
4105 s.session = object
Miss Islington (bot)42198572018-06-11 17:58:06 -07004106 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004107
Christian Heimesa170fa12017-09-15 20:27:30 +02004108 with client_context.wrap_socket(socket.socket(),
4109 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004110 s.connect((HOST, server.port))
4111 # cannot set session after handshake
4112 with self.assertRaises(ValueError) as e:
4113 s.session = session
4114 self.assertEqual(str(e.exception),
4115 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004116
Christian Heimesa170fa12017-09-15 20:27:30 +02004117 with client_context.wrap_socket(socket.socket(),
4118 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004119 # can set session before handshake and before the
4120 # connection was established
4121 s.session = session
4122 s.connect((HOST, server.port))
4123 self.assertEqual(s.session.id, session.id)
4124 self.assertEqual(s.session, session)
4125 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004126
Christian Heimesa170fa12017-09-15 20:27:30 +02004127 with client_context2.wrap_socket(socket.socket(),
4128 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004129 # cannot re-use session with a different SSLContext
4130 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004131 s.session = session
4132 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004133 self.assertEqual(str(e.exception),
4134 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004135
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004136
Thomas Woutersed03b412007-08-28 21:37:11 +00004137def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004138 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004139 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004140 plats = {
4141 'Linux': platform.linux_distribution,
4142 'Mac': platform.mac_ver,
4143 'Windows': platform.win32_ver,
4144 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004145 with warnings.catch_warnings():
4146 warnings.filterwarnings(
4147 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004148 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004149 'functions are deprecated .*',
Xtreakf5770f32018-07-05 22:29:46 +05304150 DeprecationWarning,
Berker Peksag9e7990a2015-05-16 23:21:26 +03004151 )
4152 for name, func in plats.items():
4153 plat = func()
4154 if plat and plat[0]:
4155 plat = '%s %r' % (name, plat)
4156 break
4157 else:
4158 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004159 print("test_ssl: testing with %r %r" %
4160 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4161 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004162 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004163 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4164 try:
4165 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4166 except AttributeError:
4167 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004168
Antoine Pitrou152efa22010-05-16 18:19:27 +00004169 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004170 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004171 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004172 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004173 BADCERT, BADKEY, EMPTYCERT]:
4174 if not os.path.exists(filename):
4175 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004176
Martin Panter3840b2a2016-03-27 01:53:46 +00004177 tests = [
4178 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004179 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004180 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004181
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004182 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004183 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004184
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004185 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004186 try:
4187 support.run_unittest(*tests)
4188 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004189 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004190
4191if __name__ == "__main__":
4192 test_main()