blob: f1b9565c8d91a52bf0d2b15bdb107b1fe3201640 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070064 'notAfter': 'Aug 26 14:23:15 2028 GMT',
65 'notBefore': 'Aug 29 14:23:15 2018 GMT',
66 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010067 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070089 'notAfter': 'Jul 7 14:23:16 2028 GMT',
90 'notBefore': 'Aug 29 14:23:16 2018 GMT',
91 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010092 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000119
Miss Islington (bot)e3228a32018-08-14 10:52:27 -0400120DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100121BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000122
Christian Heimes358cfd42016-09-10 22:43:48 +0200123# Not defined in all versions of OpenSSL
124OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
125OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
126OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
127OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800128OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Thomas Woutersed03b412007-08-28 21:37:11 +0000131def handle_error(prefix):
132 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000133 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000134 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Antoine Pitroub5218772010-05-21 09:56:06 +0000136def can_clear_options():
137 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200138 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000139
140def no_sslv2_implies_sslv3_hello():
141 # 0.9.7h or higher
142 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
143
Christian Heimes2427b502013-11-23 11:24:32 +0100144def have_verify_flags():
145 # 0.9.8 or higher
146 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
147
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800148def _have_secp_curves():
149 if not ssl.HAS_ECDH:
150 return False
151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
152 try:
153 ctx.set_ecdh_curve("secp384r1")
154 except ValueError:
155 return False
156 else:
157 return True
158
159
160HAVE_SECP_CURVES = _have_secp_curves()
161
162
Antoine Pitrouc695c952014-04-28 20:57:36 +0200163def utc_offset(): #NOTE: ignore issues like #1647654
164 # local time = utc time + utc offset
165 if time.daylight and time.localtime().tm_isdst > 0:
166 return -time.altzone # seconds
167 return -time.timezone
168
Christian Heimes9424bb42013-06-17 15:32:57 +0200169def asn1time(cert_time):
170 # Some versions of OpenSSL ignore seconds, see #18207
171 # 0.9.8.i
172 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
173 fmt = "%b %d %H:%M:%S %Y GMT"
174 dt = datetime.datetime.strptime(cert_time, fmt)
175 dt = dt.replace(second=0)
176 cert_time = dt.strftime(fmt)
177 # %d adds leading zero but ASN1_TIME_print() uses leading space
178 if cert_time[4] == "0":
179 cert_time = cert_time[:4] + " " + cert_time[5:]
180
181 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000182
Antoine Pitrou23df4832010-08-04 17:14:06 +0000183# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
184def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200185 if hasattr(ssl, 'PROTOCOL_SSLv2'):
186 @functools.wraps(func)
187 def f(*args, **kwargs):
188 try:
189 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
190 except ssl.SSLError:
191 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
192 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
193 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
194 return func(*args, **kwargs)
195 return f
196 else:
197 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000198
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100199needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
200
Antoine Pitrou23df4832010-08-04 17:14:06 +0000201
Christian Heimesd0486372016-09-10 23:23:33 +0200202def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
203 cert_reqs=ssl.CERT_NONE, ca_certs=None,
204 ciphers=None, certfile=None, keyfile=None,
205 **kwargs):
206 context = ssl.SSLContext(ssl_version)
207 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200208 if cert_reqs == ssl.CERT_NONE:
209 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200210 context.verify_mode = cert_reqs
211 if ca_certs is not None:
212 context.load_verify_locations(ca_certs)
213 if certfile is not None or keyfile is not None:
214 context.load_cert_chain(certfile, keyfile)
215 if ciphers is not None:
216 context.set_ciphers(ciphers)
217 return context.wrap_socket(sock, **kwargs)
218
Christian Heimesa170fa12017-09-15 20:27:30 +0200219
220def testing_context(server_cert=SIGNED_CERTFILE):
221 """Create context
222
223 client_context, server_context, hostname = testing_context()
224 """
225 if server_cert == SIGNED_CERTFILE:
226 hostname = SIGNED_CERTFILE_HOSTNAME
227 elif server_cert == SIGNED_CERTFILE2:
228 hostname = SIGNED_CERTFILE2_HOSTNAME
229 else:
230 raise ValueError(server_cert)
231
232 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
233 client_context.load_verify_locations(SIGNING_CA)
234
235 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
236 server_context.load_cert_chain(server_cert)
Christian Heimes2756ef32018-09-23 09:22:52 +0200237 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200238
239 return client_context, server_context, hostname
240
241
Antoine Pitrou152efa22010-05-16 18:19:27 +0000242class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000243
Antoine Pitrou480a1242010-04-28 21:37:09 +0000244 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000245 ssl.CERT_NONE
246 ssl.CERT_OPTIONAL
247 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100248 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100249 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100250 if ssl.HAS_ECDH:
251 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100252 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
253 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000254 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100255 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700256 ssl.OP_NO_SSLv2
257 ssl.OP_NO_SSLv3
258 ssl.OP_NO_TLSv1
259 ssl.OP_NO_TLSv1_3
260 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
261 ssl.OP_NO_TLSv1_1
262 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200263 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000264
Christian Heimes89c20512018-02-27 11:17:32 +0100265 def test_private_init(self):
266 with self.assertRaisesRegex(TypeError, "public constructor"):
267 with socket.socket() as s:
268 ssl.SSLSocket(s)
269
Antoine Pitrou172f0252014-04-18 20:33:08 +0200270 def test_str_for_enums(self):
271 # Make sure that the PROTOCOL_* constants have enum-like string
272 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200273 proto = ssl.PROTOCOL_TLS
274 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200275 ctx = ssl.SSLContext(proto)
276 self.assertIs(ctx.protocol, proto)
277
Antoine Pitrou480a1242010-04-28 21:37:09 +0000278 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000280 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 sys.stdout.write("\n RAND_status is %d (%s)\n"
282 % (v, (v and "sufficient randomness") or
283 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200284
285 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
286 self.assertEqual(len(data), 16)
287 self.assertEqual(is_cryptographic, v == 1)
288 if v:
289 data = ssl.RAND_bytes(16)
290 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200291 else:
292 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200293
Victor Stinner1e81a392013-12-19 16:47:04 +0100294 # negative num is invalid
295 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
296 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
297
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100298 if hasattr(ssl, 'RAND_egd'):
299 self.assertRaises(TypeError, ssl.RAND_egd, 1)
300 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000301 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200302 ssl.RAND_add(b"this is a random bytes object", 75.0)
303 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000304
Christian Heimesf77b4b22013-08-21 13:26:05 +0200305 @unittest.skipUnless(os.name == 'posix', 'requires posix')
306 def test_random_fork(self):
307 status = ssl.RAND_status()
308 if not status:
309 self.fail("OpenSSL's PRNG has insufficient randomness")
310
311 rfd, wfd = os.pipe()
312 pid = os.fork()
313 if pid == 0:
314 try:
315 os.close(rfd)
316 child_random = ssl.RAND_pseudo_bytes(16)[0]
317 self.assertEqual(len(child_random), 16)
318 os.write(wfd, child_random)
319 os.close(wfd)
320 except BaseException:
321 os._exit(1)
322 else:
323 os._exit(0)
324 else:
325 os.close(wfd)
326 self.addCleanup(os.close, rfd)
327 _, status = os.waitpid(pid, 0)
328 self.assertEqual(status, 0)
329
330 child_random = os.read(rfd, 16)
331 self.assertEqual(len(child_random), 16)
332 parent_random = ssl.RAND_pseudo_bytes(16)[0]
333 self.assertEqual(len(parent_random), 16)
334
335 self.assertNotEqual(child_random, parent_random)
336
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -0700337 maxDiff = None
338
Antoine Pitrou480a1242010-04-28 21:37:09 +0000339 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000340 # note that this uses an 'unofficial' function in _ssl.c,
341 # provided solely for this test, to exercise the certificate
342 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100343 self.assertEqual(
344 ssl._ssl._test_decode_cert(CERTFILE),
345 CERTFILE_INFO
346 )
347 self.assertEqual(
348 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
349 SIGNED_CERTFILE_INFO
350 )
351
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200352 # Issue #13034: the subjectAltName in some certificates
353 # (notably projects.developer.nokia.com:443) wasn't parsed
354 p = ssl._ssl._test_decode_cert(NOKIACERT)
355 if support.verbose:
356 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
357 self.assertEqual(p['subjectAltName'],
358 (('DNS', 'projects.developer.nokia.com'),
359 ('DNS', 'projects.forum.nokia.com'))
360 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100361 # extra OCSP and AIA fields
362 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
363 self.assertEqual(p['caIssuers'],
364 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
365 self.assertEqual(p['crlDistributionPoints'],
366 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000367
Christian Heimes824f7f32013-08-17 00:54:47 +0200368 def test_parse_cert_CVE_2013_4238(self):
369 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
370 if support.verbose:
371 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
372 subject = ((('countryName', 'US'),),
373 (('stateOrProvinceName', 'Oregon'),),
374 (('localityName', 'Beaverton'),),
375 (('organizationName', 'Python Software Foundation'),),
376 (('organizationalUnitName', 'Python Core Development'),),
377 (('commonName', 'null.python.org\x00example.org'),),
378 (('emailAddress', 'python-dev@python.org'),))
379 self.assertEqual(p['subject'], subject)
380 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200381 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
382 san = (('DNS', 'altnull.python.org\x00example.com'),
383 ('email', 'null@python.org\x00user@example.org'),
384 ('URI', 'http://null.python.org\x00http://example.org'),
385 ('IP Address', '192.0.2.1'),
386 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
387 else:
388 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
389 san = (('DNS', 'altnull.python.org\x00example.com'),
390 ('email', 'null@python.org\x00user@example.org'),
391 ('URI', 'http://null.python.org\x00http://example.org'),
392 ('IP Address', '192.0.2.1'),
393 ('IP Address', '<invalid>'))
394
395 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200396
Christian Heimes1c03abd2016-09-06 23:25:35 +0200397 def test_parse_all_sans(self):
398 p = ssl._ssl._test_decode_cert(ALLSANFILE)
399 self.assertEqual(p['subjectAltName'],
400 (
401 ('DNS', 'allsans'),
402 ('othername', '<unsupported>'),
403 ('othername', '<unsupported>'),
404 ('email', 'user@example.org'),
405 ('DNS', 'www.example.org'),
406 ('DirName',
407 ((('countryName', 'XY'),),
408 (('localityName', 'Castle Anthrax'),),
409 (('organizationName', 'Python Software Foundation'),),
410 (('commonName', 'dirname example'),))),
411 ('URI', 'https://www.python.org/'),
412 ('IP Address', '127.0.0.1'),
413 ('IP Address', '0:0:0:0:0:0:0:1\n'),
414 ('Registered ID', '1.2.3.4.5')
415 )
416 )
417
Antoine Pitrou480a1242010-04-28 21:37:09 +0000418 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000419 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000420 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000421 d1 = ssl.PEM_cert_to_DER_cert(pem)
422 p2 = ssl.DER_cert_to_PEM_cert(d1)
423 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000424 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000425 if not p2.startswith(ssl.PEM_HEADER + '\n'):
426 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
427 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
428 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000429
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000430 def test_openssl_version(self):
431 n = ssl.OPENSSL_VERSION_NUMBER
432 t = ssl.OPENSSL_VERSION_INFO
433 s = ssl.OPENSSL_VERSION
434 self.assertIsInstance(n, int)
435 self.assertIsInstance(t, tuple)
436 self.assertIsInstance(s, str)
437 # Some sanity checks follow
438 # >= 0.9
439 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400440 # < 3.0
441 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000442 major, minor, fix, patch, status = t
443 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400444 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000445 self.assertGreaterEqual(minor, 0)
446 self.assertLess(minor, 256)
447 self.assertGreaterEqual(fix, 0)
448 self.assertLess(fix, 256)
449 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100450 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000451 self.assertGreaterEqual(status, 0)
452 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400453 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200454 if IS_LIBRESSL:
455 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100456 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400457 else:
458 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100459 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000460
Antoine Pitrou9d543662010-04-23 23:10:32 +0000461 @support.cpython_only
462 def test_refcycle(self):
463 # Issue #7943: an SSL object doesn't create reference cycles with
464 # itself.
465 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200466 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000467 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100468 with support.check_warnings(("", ResourceWarning)):
469 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100470 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000471
Antoine Pitroua468adc2010-09-14 14:43:44 +0000472 def test_wrapped_unconnected(self):
473 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200474 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000475 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200476 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100477 self.assertRaises(OSError, ss.recv, 1)
478 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
479 self.assertRaises(OSError, ss.recvfrom, 1)
480 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
481 self.assertRaises(OSError, ss.send, b'x')
482 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)6485aa62018-12-06 12:52:43 -0800483 self.assertRaises(NotImplementedError, ss.dup)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800484 self.assertRaises(NotImplementedError, ss.sendmsg,
485 [b'x'], (), 0, ('0.0.0.0', 0))
Miss Islington (bot)6485aa62018-12-06 12:52:43 -0800486 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
487 self.assertRaises(NotImplementedError, ss.recvmsg_into,
488 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000489
Antoine Pitrou40f08742010-04-24 22:04:40 +0000490 def test_timeout(self):
491 # Issue #8524: when creating an SSL socket, the timeout of the
492 # original socket should be retained.
493 for timeout in (None, 0.0, 5.0):
494 s = socket.socket(socket.AF_INET)
495 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200496 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100497 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000498
Christian Heimesd0486372016-09-10 23:23:33 +0200499 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000500 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000501 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000502 "certfile must be specified",
503 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000504 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000505 "certfile must be specified for server-side operations",
506 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000507 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000508 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200509 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100510 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
511 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200512 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200513 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000514 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000515 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000516 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200517 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000518 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000519 ssl.wrap_socket(sock,
520 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000521 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200522 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000523 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000524 ssl.wrap_socket(sock,
525 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000526 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000527
Martin Panter3464ea22016-02-01 21:58:11 +0000528 def bad_cert_test(self, certfile):
529 """Check that trying to use the given client certificate fails"""
530 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
531 certfile)
532 sock = socket.socket()
533 self.addCleanup(sock.close)
534 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200535 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200536 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000537
538 def test_empty_cert(self):
539 """Wrapping with an empty cert file"""
540 self.bad_cert_test("nullcert.pem")
541
542 def test_malformed_cert(self):
543 """Wrapping with a badly formatted certificate (syntax error)"""
544 self.bad_cert_test("badcert.pem")
545
546 def test_malformed_key(self):
547 """Wrapping with a badly formatted key (syntax error)"""
548 self.bad_cert_test("badkey.pem")
549
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000550 def test_match_hostname(self):
551 def ok(cert, hostname):
552 ssl.match_hostname(cert, hostname)
553 def fail(cert, hostname):
554 self.assertRaises(ssl.CertificateError,
555 ssl.match_hostname, cert, hostname)
556
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100557 # -- Hostname matching --
558
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000559 cert = {'subject': ((('commonName', 'example.com'),),)}
560 ok(cert, 'example.com')
561 ok(cert, 'ExAmple.cOm')
562 fail(cert, 'www.example.com')
563 fail(cert, '.example.com')
564 fail(cert, 'example.org')
565 fail(cert, 'exampleXcom')
566
567 cert = {'subject': ((('commonName', '*.a.com'),),)}
568 ok(cert, 'foo.a.com')
569 fail(cert, 'bar.foo.a.com')
570 fail(cert, 'a.com')
571 fail(cert, 'Xa.com')
572 fail(cert, '.a.com')
573
Mandeep Singhede2ac92017-11-27 04:01:27 +0530574 # only match wildcards when they are the only thing
575 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000576 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530577 fail(cert, 'foo.com')
578 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000579 fail(cert, 'bar.com')
580 fail(cert, 'foo.a.com')
581 fail(cert, 'bar.foo.com')
582
Christian Heimes824f7f32013-08-17 00:54:47 +0200583 # NULL bytes are bad, CVE-2013-4073
584 cert = {'subject': ((('commonName',
585 'null.python.org\x00example.org'),),)}
586 ok(cert, 'null.python.org\x00example.org') # or raise an error?
587 fail(cert, 'example.org')
588 fail(cert, 'null.python.org')
589
Georg Brandl72c98d32013-10-27 07:16:53 +0100590 # error cases with wildcards
591 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
592 fail(cert, 'bar.foo.a.com')
593 fail(cert, 'a.com')
594 fail(cert, 'Xa.com')
595 fail(cert, '.a.com')
596
597 cert = {'subject': ((('commonName', 'a.*.com'),),)}
598 fail(cert, 'a.foo.com')
599 fail(cert, 'a..com')
600 fail(cert, 'a.com')
601
602 # wildcard doesn't match IDNA prefix 'xn--'
603 idna = 'püthon.python.org'.encode("idna").decode("ascii")
604 cert = {'subject': ((('commonName', idna),),)}
605 ok(cert, idna)
606 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
607 fail(cert, idna)
608 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
609 fail(cert, idna)
610
611 # wildcard in first fragment and IDNA A-labels in sequent fragments
612 # are supported.
613 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
614 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530615 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
616 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100617 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
618 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
619
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000620 # Slightly fake real-world example
621 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
622 'subject': ((('commonName', 'linuxfrz.org'),),),
623 'subjectAltName': (('DNS', 'linuxfr.org'),
624 ('DNS', 'linuxfr.com'),
625 ('othername', '<unsupported>'))}
626 ok(cert, 'linuxfr.org')
627 ok(cert, 'linuxfr.com')
628 # Not a "DNS" entry
629 fail(cert, '<unsupported>')
630 # When there is a subjectAltName, commonName isn't used
631 fail(cert, 'linuxfrz.org')
632
633 # A pristine real-world example
634 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
635 'subject': ((('countryName', 'US'),),
636 (('stateOrProvinceName', 'California'),),
637 (('localityName', 'Mountain View'),),
638 (('organizationName', 'Google Inc'),),
639 (('commonName', 'mail.google.com'),))}
640 ok(cert, 'mail.google.com')
641 fail(cert, 'gmail.com')
642 # Only commonName is considered
643 fail(cert, 'California')
644
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100645 # -- IPv4 matching --
646 cert = {'subject': ((('commonName', 'example.com'),),),
647 'subjectAltName': (('DNS', 'example.com'),
648 ('IP Address', '10.11.12.13'),
649 ('IP Address', '14.15.16.17'))}
650 ok(cert, '10.11.12.13')
651 ok(cert, '14.15.16.17')
652 fail(cert, '14.15.16.18')
653 fail(cert, 'example.net')
654
655 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800656 if hasattr(socket, 'AF_INET6'):
657 cert = {'subject': ((('commonName', 'example.com'),),),
658 'subjectAltName': (
659 ('DNS', 'example.com'),
660 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
661 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
662 ok(cert, '2001::cafe')
663 ok(cert, '2003::baba')
664 fail(cert, '2003::bebe')
665 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100666
667 # -- Miscellaneous --
668
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000669 # Neither commonName nor subjectAltName
670 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
671 'subject': ((('countryName', 'US'),),
672 (('stateOrProvinceName', 'California'),),
673 (('localityName', 'Mountain View'),),
674 (('organizationName', 'Google Inc'),))}
675 fail(cert, 'mail.google.com')
676
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200677 # No DNS entry in subjectAltName but a commonName
678 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
679 'subject': ((('countryName', 'US'),),
680 (('stateOrProvinceName', 'California'),),
681 (('localityName', 'Mountain View'),),
682 (('commonName', 'mail.google.com'),)),
683 'subjectAltName': (('othername', 'blabla'), )}
684 ok(cert, 'mail.google.com')
685
686 # No DNS entry subjectAltName and no commonName
687 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
688 'subject': ((('countryName', 'US'),),
689 (('stateOrProvinceName', 'California'),),
690 (('localityName', 'Mountain View'),),
691 (('organizationName', 'Google Inc'),)),
692 'subjectAltName': (('othername', 'blabla'),)}
693 fail(cert, 'google.com')
694
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000695 # Empty cert / no cert
696 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
697 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
698
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200699 # Issue #17980: avoid denials of service by refusing more than one
700 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800701 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
702 with self.assertRaisesRegex(
703 ssl.CertificateError,
704 "partial wildcards in leftmost label are not supported"):
705 ssl.match_hostname(cert, 'axxb.example.com')
706
707 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
708 with self.assertRaisesRegex(
709 ssl.CertificateError,
710 "wildcard can only be present in the leftmost label"):
711 ssl.match_hostname(cert, 'www.sub.example.com')
712
713 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
714 with self.assertRaisesRegex(
715 ssl.CertificateError,
716 "too many wildcards"):
717 ssl.match_hostname(cert, 'axxbxxc.example.com')
718
719 cert = {'subject': ((('commonName', '*'),),)}
720 with self.assertRaisesRegex(
721 ssl.CertificateError,
722 "sole wildcard without additional labels are not support"):
723 ssl.match_hostname(cert, 'host')
724
725 cert = {'subject': ((('commonName', '*.com'),),)}
726 with self.assertRaisesRegex(
727 ssl.CertificateError,
728 r"hostname 'com' doesn't match '\*.com'"):
729 ssl.match_hostname(cert, 'com')
730
731 # extra checks for _inet_paton()
732 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
733 with self.assertRaises(ValueError):
734 ssl._inet_paton(invalid)
735 for ipaddr in ['127.0.0.1', '192.168.0.1']:
736 self.assertTrue(ssl._inet_paton(ipaddr))
737 if hasattr(socket, 'AF_INET6'):
738 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
739 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200740
Antoine Pitroud5323212010-10-22 18:19:07 +0000741 def test_server_side(self):
742 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200743 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000744 with socket.socket() as sock:
745 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
746 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000747
Antoine Pitroud6494802011-07-21 01:11:30 +0200748 def test_unknown_channel_binding(self):
749 # should raise ValueError for unknown type
750 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200751 s.bind(('127.0.0.1', 0))
752 s.listen()
753 c = socket.socket(socket.AF_INET)
754 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200755 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100756 with self.assertRaises(ValueError):
757 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200758 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200759
760 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
761 "'tls-unique' channel binding not available")
762 def test_tls_unique_channel_binding(self):
763 # unconnected should return None for known type
764 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200765 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100766 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200767 # the same for server-side
768 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200769 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100770 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200771
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600772 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200773 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600774 r = repr(ss)
775 with self.assertWarns(ResourceWarning) as cm:
776 ss = None
777 support.gc_collect()
778 self.assertIn(r, str(cm.warning.args[0]))
779
Christian Heimes6d7ad132013-06-09 18:02:55 +0200780 def test_get_default_verify_paths(self):
781 paths = ssl.get_default_verify_paths()
782 self.assertEqual(len(paths), 6)
783 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
784
785 with support.EnvironmentVarGuard() as env:
786 env["SSL_CERT_DIR"] = CAPATH
787 env["SSL_CERT_FILE"] = CERTFILE
788 paths = ssl.get_default_verify_paths()
789 self.assertEqual(paths.cafile, CERTFILE)
790 self.assertEqual(paths.capath, CAPATH)
791
Christian Heimes44109d72013-11-22 01:51:30 +0100792 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
793 def test_enum_certificates(self):
794 self.assertTrue(ssl.enum_certificates("CA"))
795 self.assertTrue(ssl.enum_certificates("ROOT"))
796
797 self.assertRaises(TypeError, ssl.enum_certificates)
798 self.assertRaises(WindowsError, ssl.enum_certificates, "")
799
Christian Heimesc2d65e12013-11-22 16:13:55 +0100800 trust_oids = set()
801 for storename in ("CA", "ROOT"):
802 store = ssl.enum_certificates(storename)
803 self.assertIsInstance(store, list)
804 for element in store:
805 self.assertIsInstance(element, tuple)
806 self.assertEqual(len(element), 3)
807 cert, enc, trust = element
808 self.assertIsInstance(cert, bytes)
809 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
810 self.assertIsInstance(trust, (set, bool))
811 if isinstance(trust, set):
812 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100813
814 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100815 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200816
Christian Heimes46bebee2013-06-09 19:03:31 +0200817 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100818 def test_enum_crls(self):
819 self.assertTrue(ssl.enum_crls("CA"))
820 self.assertRaises(TypeError, ssl.enum_crls)
821 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200822
Christian Heimes44109d72013-11-22 01:51:30 +0100823 crls = ssl.enum_crls("CA")
824 self.assertIsInstance(crls, list)
825 for element in crls:
826 self.assertIsInstance(element, tuple)
827 self.assertEqual(len(element), 2)
828 self.assertIsInstance(element[0], bytes)
829 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200830
Christian Heimes46bebee2013-06-09 19:03:31 +0200831
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100832 def test_asn1object(self):
833 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
834 '1.3.6.1.5.5.7.3.1')
835
836 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
837 self.assertEqual(val, expected)
838 self.assertEqual(val.nid, 129)
839 self.assertEqual(val.shortname, 'serverAuth')
840 self.assertEqual(val.longname, 'TLS Web Server Authentication')
841 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
842 self.assertIsInstance(val, ssl._ASN1Object)
843 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
844
845 val = ssl._ASN1Object.fromnid(129)
846 self.assertEqual(val, expected)
847 self.assertIsInstance(val, ssl._ASN1Object)
848 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100849 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
850 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100851 for i in range(1000):
852 try:
853 obj = ssl._ASN1Object.fromnid(i)
854 except ValueError:
855 pass
856 else:
857 self.assertIsInstance(obj.nid, int)
858 self.assertIsInstance(obj.shortname, str)
859 self.assertIsInstance(obj.longname, str)
860 self.assertIsInstance(obj.oid, (str, type(None)))
861
862 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
863 self.assertEqual(val, expected)
864 self.assertIsInstance(val, ssl._ASN1Object)
865 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
866 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
867 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100868 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
869 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100870
Christian Heimes72d28502013-11-23 13:56:58 +0100871 def test_purpose_enum(self):
872 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
873 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
874 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
875 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
876 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
877 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
878 '1.3.6.1.5.5.7.3.1')
879
880 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
881 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
882 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
883 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
884 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
885 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
886 '1.3.6.1.5.5.7.3.2')
887
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100888 def test_unsupported_dtls(self):
889 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
890 self.addCleanup(s.close)
891 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200892 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100893 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200894 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100895 with self.assertRaises(NotImplementedError) as cx:
896 ctx.wrap_socket(s)
897 self.assertEqual(str(cx.exception), "only stream sockets are supported")
898
Antoine Pitrouc695c952014-04-28 20:57:36 +0200899 def cert_time_ok(self, timestring, timestamp):
900 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
901
902 def cert_time_fail(self, timestring):
903 with self.assertRaises(ValueError):
904 ssl.cert_time_to_seconds(timestring)
905
906 @unittest.skipUnless(utc_offset(),
907 'local time needs to be different from UTC')
908 def test_cert_time_to_seconds_timezone(self):
909 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
910 # results if local timezone is not UTC
911 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
912 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
913
914 def test_cert_time_to_seconds(self):
915 timestring = "Jan 5 09:34:43 2018 GMT"
916 ts = 1515144883.0
917 self.cert_time_ok(timestring, ts)
918 # accept keyword parameter, assert its name
919 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
920 # accept both %e and %d (space or zero generated by strftime)
921 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
922 # case-insensitive
923 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
924 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
925 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
926 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
927 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
928 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
929 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
930 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
931
932 newyear_ts = 1230768000.0
933 # leap seconds
934 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
935 # same timestamp
936 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
937
938 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
939 # allow 60th second (even if it is not a leap second)
940 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
941 # allow 2nd leap second for compatibility with time.strptime()
942 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
943 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
944
Mike53f7a7c2017-12-14 14:04:53 +0300945 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200946 # 99991231235959Z (rfc 5280)
947 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
948
949 @support.run_with_locale('LC_ALL', '')
950 def test_cert_time_to_seconds_locale(self):
951 # `cert_time_to_seconds()` should be locale independent
952
953 def local_february_name():
954 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
955
956 if local_february_name().lower() == 'feb':
957 self.skipTest("locale-specific month name needs to be "
958 "different from C locale")
959
960 # locale-independent
961 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
962 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
963
Martin Panter3840b2a2016-03-27 01:53:46 +0000964 def test_connect_ex_error(self):
965 server = socket.socket(socket.AF_INET)
966 self.addCleanup(server.close)
967 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200968 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000969 cert_reqs=ssl.CERT_REQUIRED)
970 self.addCleanup(s.close)
971 rc = s.connect_ex((HOST, port))
972 # Issue #19919: Windows machines or VMs hosted on Windows
973 # machines sometimes return EWOULDBLOCK.
974 errors = (
975 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
976 errno.EWOULDBLOCK,
977 )
978 self.assertIn(rc, errors)
979
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100980
Antoine Pitrou152efa22010-05-16 18:19:27 +0000981class ContextTests(unittest.TestCase):
982
Antoine Pitrou23df4832010-08-04 17:14:06 +0000983 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000984 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100985 for protocol in PROTOCOLS:
986 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200987 ctx = ssl.SSLContext()
988 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000989 self.assertRaises(ValueError, ssl.SSLContext, -1)
990 self.assertRaises(ValueError, ssl.SSLContext, 42)
991
Antoine Pitrou23df4832010-08-04 17:14:06 +0000992 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000993 def test_protocol(self):
994 for proto in PROTOCOLS:
995 ctx = ssl.SSLContext(proto)
996 self.assertEqual(ctx.protocol, proto)
997
998 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200999 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001000 ctx.set_ciphers("ALL")
1001 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001002 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001003 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001004
Christian Heimes892d66e2018-01-29 14:10:18 +01001005 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1006 "Test applies only to Python default ciphers")
1007 def test_python_ciphers(self):
1008 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1009 ciphers = ctx.get_ciphers()
1010 for suite in ciphers:
1011 name = suite['name']
1012 self.assertNotIn("PSK", name)
1013 self.assertNotIn("SRP", name)
1014 self.assertNotIn("MD5", name)
1015 self.assertNotIn("RC4", name)
1016 self.assertNotIn("3DES", name)
1017
Christian Heimes25bfcd52016-09-06 00:04:45 +02001018 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1019 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001020 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001021 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001022 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001023 self.assertIn('AES256-GCM-SHA384', names)
1024 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001025
Antoine Pitrou23df4832010-08-04 17:14:06 +00001026 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001027 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001028 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001029 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001030 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001031 # SSLContext also enables these by default
1032 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001033 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1034 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001035 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001036 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001037 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001038 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001039 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1040 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001041 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001042 # Ubuntu has OP_NO_SSLv3 forced on by default
1043 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001044 else:
1045 with self.assertRaises(ValueError):
1046 ctx.options = 0
1047
Christian Heimesa170fa12017-09-15 20:27:30 +02001048 def test_verify_mode_protocol(self):
1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001050 # Default value
1051 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1052 ctx.verify_mode = ssl.CERT_OPTIONAL
1053 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1054 ctx.verify_mode = ssl.CERT_REQUIRED
1055 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1056 ctx.verify_mode = ssl.CERT_NONE
1057 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1058 with self.assertRaises(TypeError):
1059 ctx.verify_mode = None
1060 with self.assertRaises(ValueError):
1061 ctx.verify_mode = 42
1062
Christian Heimesa170fa12017-09-15 20:27:30 +02001063 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1064 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1065 self.assertFalse(ctx.check_hostname)
1066
1067 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1068 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1069 self.assertTrue(ctx.check_hostname)
1070
Christian Heimes61d478c2018-01-27 15:51:38 +01001071 def test_hostname_checks_common_name(self):
1072 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1073 self.assertTrue(ctx.hostname_checks_common_name)
1074 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1075 ctx.hostname_checks_common_name = True
1076 self.assertTrue(ctx.hostname_checks_common_name)
1077 ctx.hostname_checks_common_name = False
1078 self.assertFalse(ctx.hostname_checks_common_name)
1079 ctx.hostname_checks_common_name = True
1080 self.assertTrue(ctx.hostname_checks_common_name)
1081 else:
1082 with self.assertRaises(AttributeError):
1083 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001084
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001085 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1086 "required OpenSSL 1.1.0g")
1087 def test_min_max_version(self):
1088 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1089 self.assertEqual(
1090 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1091 )
1092 self.assertEqual(
1093 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1094 )
1095
1096 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1097 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1098 self.assertEqual(
1099 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1100 )
1101 self.assertEqual(
1102 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1103 )
1104
1105 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1106 ctx.maximum_version = ssl.TLSVersion.TLSv1
1107 self.assertEqual(
1108 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1109 )
1110 self.assertEqual(
1111 ctx.maximum_version, ssl.TLSVersion.TLSv1
1112 )
1113
1114 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1115 self.assertEqual(
1116 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1117 )
1118
1119 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1120 self.assertIn(
1121 ctx.maximum_version,
1122 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1123 )
1124
1125 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1126 self.assertIn(
1127 ctx.minimum_version,
1128 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1129 )
1130
1131 with self.assertRaises(ValueError):
1132 ctx.minimum_version = 42
1133
1134 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1135
1136 self.assertEqual(
1137 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1138 )
1139 self.assertEqual(
1140 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1141 )
1142 with self.assertRaises(ValueError):
1143 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1144 with self.assertRaises(ValueError):
1145 ctx.maximum_version = ssl.TLSVersion.TLSv1
1146
1147
Christian Heimes2427b502013-11-23 11:24:32 +01001148 @unittest.skipUnless(have_verify_flags(),
1149 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001150 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001152 # default value
1153 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1154 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001155 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1156 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1157 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1158 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1159 ctx.verify_flags = ssl.VERIFY_DEFAULT
1160 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1161 # supports any value
1162 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1163 self.assertEqual(ctx.verify_flags,
1164 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1165 with self.assertRaises(TypeError):
1166 ctx.verify_flags = None
1167
Antoine Pitrou152efa22010-05-16 18:19:27 +00001168 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001170 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001171 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001172 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1173 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001174 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001175 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001176 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001177 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001178 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001179 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001180 ctx.load_cert_chain(EMPTYCERT)
1181 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001182 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001183 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1184 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1185 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001186 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001187 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001188 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001189 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001190 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001191 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1192 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001193 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001194 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001195 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001196 # Password protected key and cert
1197 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1198 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1199 ctx.load_cert_chain(CERTFILE_PROTECTED,
1200 password=bytearray(KEY_PASSWORD.encode()))
1201 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1202 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1203 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1204 bytearray(KEY_PASSWORD.encode()))
1205 with self.assertRaisesRegex(TypeError, "should be a string"):
1206 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1207 with self.assertRaises(ssl.SSLError):
1208 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1209 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1210 # openssl has a fixed limit on the password buffer.
1211 # PEM_BUFSIZE is generally set to 1kb.
1212 # Return a string larger than this.
1213 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1214 # Password callback
1215 def getpass_unicode():
1216 return KEY_PASSWORD
1217 def getpass_bytes():
1218 return KEY_PASSWORD.encode()
1219 def getpass_bytearray():
1220 return bytearray(KEY_PASSWORD.encode())
1221 def getpass_badpass():
1222 return "badpass"
1223 def getpass_huge():
1224 return b'a' * (1024 * 1024)
1225 def getpass_bad_type():
1226 return 9
1227 def getpass_exception():
1228 raise Exception('getpass error')
1229 class GetPassCallable:
1230 def __call__(self):
1231 return KEY_PASSWORD
1232 def getpass(self):
1233 return KEY_PASSWORD
1234 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1235 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1236 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1237 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1238 ctx.load_cert_chain(CERTFILE_PROTECTED,
1239 password=GetPassCallable().getpass)
1240 with self.assertRaises(ssl.SSLError):
1241 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1242 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1243 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1244 with self.assertRaisesRegex(TypeError, "must return a string"):
1245 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1246 with self.assertRaisesRegex(Exception, "getpass error"):
1247 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1248 # Make sure the password function isn't called if it isn't needed
1249 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001250
1251 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001252 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001253 ctx.load_verify_locations(CERTFILE)
1254 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1255 ctx.load_verify_locations(BYTES_CERTFILE)
1256 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1257 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001258 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001259 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001260 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001261 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001262 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001263 ctx.load_verify_locations(BADCERT)
1264 ctx.load_verify_locations(CERTFILE, CAPATH)
1265 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1266
Victor Stinner80f75e62011-01-29 11:31:20 +00001267 # Issue #10989: crash if the second argument type is invalid
1268 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1269
Christian Heimesefff7062013-11-21 03:35:02 +01001270 def test_load_verify_cadata(self):
1271 # test cadata
1272 with open(CAFILE_CACERT) as f:
1273 cacert_pem = f.read()
1274 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1275 with open(CAFILE_NEURONIO) as f:
1276 neuronio_pem = f.read()
1277 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1278
1279 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001280 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001281 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1282 ctx.load_verify_locations(cadata=cacert_pem)
1283 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1284 ctx.load_verify_locations(cadata=neuronio_pem)
1285 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1286 # cert already in hash table
1287 ctx.load_verify_locations(cadata=neuronio_pem)
1288 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1289
1290 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001291 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001292 combined = "\n".join((cacert_pem, neuronio_pem))
1293 ctx.load_verify_locations(cadata=combined)
1294 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1295
1296 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001297 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001298 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1299 neuronio_pem, "tail"]
1300 ctx.load_verify_locations(cadata="\n".join(combined))
1301 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1302
1303 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001304 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001305 ctx.load_verify_locations(cadata=cacert_der)
1306 ctx.load_verify_locations(cadata=neuronio_der)
1307 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1308 # cert already in hash table
1309 ctx.load_verify_locations(cadata=cacert_der)
1310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1311
1312 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001313 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001314 combined = b"".join((cacert_der, neuronio_der))
1315 ctx.load_verify_locations(cadata=combined)
1316 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1317
1318 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001319 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001320 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1321
1322 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1323 ctx.load_verify_locations(cadata="broken")
1324 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1325 ctx.load_verify_locations(cadata=b"broken")
1326
1327
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001328 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001329 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001330 ctx.load_dh_params(DHFILE)
1331 if os.name != 'nt':
1332 ctx.load_dh_params(BYTES_DHFILE)
1333 self.assertRaises(TypeError, ctx.load_dh_params)
1334 self.assertRaises(TypeError, ctx.load_dh_params, None)
1335 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001336 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001337 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001338 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001339 ctx.load_dh_params(CERTFILE)
1340
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001341 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001342 def test_session_stats(self):
1343 for proto in PROTOCOLS:
1344 ctx = ssl.SSLContext(proto)
1345 self.assertEqual(ctx.session_stats(), {
1346 'number': 0,
1347 'connect': 0,
1348 'connect_good': 0,
1349 'connect_renegotiate': 0,
1350 'accept': 0,
1351 'accept_good': 0,
1352 'accept_renegotiate': 0,
1353 'hits': 0,
1354 'misses': 0,
1355 'timeouts': 0,
1356 'cache_full': 0,
1357 })
1358
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001359 def test_set_default_verify_paths(self):
1360 # There's not much we can do to test that it acts as expected,
1361 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001362 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001363 ctx.set_default_verify_paths()
1364
Antoine Pitrou501da612011-12-21 09:27:41 +01001365 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001366 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001367 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001368 ctx.set_ecdh_curve("prime256v1")
1369 ctx.set_ecdh_curve(b"prime256v1")
1370 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1371 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1372 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1373 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1374
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001375 @needs_sni
1376 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001377 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001378
1379 # set_servername_callback expects a callable, or None
1380 self.assertRaises(TypeError, ctx.set_servername_callback)
1381 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1382 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1383 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1384
1385 def dummycallback(sock, servername, ctx):
1386 pass
1387 ctx.set_servername_callback(None)
1388 ctx.set_servername_callback(dummycallback)
1389
1390 @needs_sni
1391 def test_sni_callback_refcycle(self):
1392 # Reference cycles through the servername callback are detected
1393 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001394 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001395 def dummycallback(sock, servername, ctx, cycle=ctx):
1396 pass
1397 ctx.set_servername_callback(dummycallback)
1398 wr = weakref.ref(ctx)
1399 del ctx, dummycallback
1400 gc.collect()
1401 self.assertIs(wr(), None)
1402
Christian Heimes9a5395a2013-06-17 15:44:12 +02001403 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001404 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001405 self.assertEqual(ctx.cert_store_stats(),
1406 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1407 ctx.load_cert_chain(CERTFILE)
1408 self.assertEqual(ctx.cert_store_stats(),
1409 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1410 ctx.load_verify_locations(CERTFILE)
1411 self.assertEqual(ctx.cert_store_stats(),
1412 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001413 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001414 self.assertEqual(ctx.cert_store_stats(),
1415 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1416
1417 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001418 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001419 self.assertEqual(ctx.get_ca_certs(), [])
1420 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1421 ctx.load_verify_locations(CERTFILE)
1422 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001423 # but CAFILE_CACERT is a CA cert
1424 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001425 self.assertEqual(ctx.get_ca_certs(),
1426 [{'issuer': ((('organizationName', 'Root CA'),),
1427 (('organizationalUnitName', 'http://www.cacert.org'),),
1428 (('commonName', 'CA Cert Signing Authority'),),
1429 (('emailAddress', 'support@cacert.org'),)),
1430 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1431 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1432 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001433 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001434 'subject': ((('organizationName', 'Root CA'),),
1435 (('organizationalUnitName', 'http://www.cacert.org'),),
1436 (('commonName', 'CA Cert Signing Authority'),),
1437 (('emailAddress', 'support@cacert.org'),)),
1438 'version': 3}])
1439
Martin Panterb55f8b72016-01-14 12:53:56 +00001440 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001441 pem = f.read()
1442 der = ssl.PEM_cert_to_DER_cert(pem)
1443 self.assertEqual(ctx.get_ca_certs(True), [der])
1444
Christian Heimes72d28502013-11-23 13:56:58 +01001445 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001446 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001447 ctx.load_default_certs()
1448
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001450 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1451 ctx.load_default_certs()
1452
Christian Heimesa170fa12017-09-15 20:27:30 +02001453 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001454 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1455
Christian Heimesa170fa12017-09-15 20:27:30 +02001456 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001457 self.assertRaises(TypeError, ctx.load_default_certs, None)
1458 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1459
Benjamin Peterson91244e02014-10-03 18:17:15 -04001460 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001461 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001462 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001463 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001464 with support.EnvironmentVarGuard() as env:
1465 env["SSL_CERT_DIR"] = CAPATH
1466 env["SSL_CERT_FILE"] = CERTFILE
1467 ctx.load_default_certs()
1468 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1469
Benjamin Peterson91244e02014-10-03 18:17:15 -04001470 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001471 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001472 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001473 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001474 ctx.load_default_certs()
1475 stats = ctx.cert_store_stats()
1476
Christian Heimesa170fa12017-09-15 20:27:30 +02001477 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001478 with support.EnvironmentVarGuard() as env:
1479 env["SSL_CERT_DIR"] = CAPATH
1480 env["SSL_CERT_FILE"] = CERTFILE
1481 ctx.load_default_certs()
1482 stats["x509"] += 1
1483 self.assertEqual(ctx.cert_store_stats(), stats)
1484
Christian Heimes358cfd42016-09-10 22:43:48 +02001485 def _assert_context_options(self, ctx):
1486 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1487 if OP_NO_COMPRESSION != 0:
1488 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1489 OP_NO_COMPRESSION)
1490 if OP_SINGLE_DH_USE != 0:
1491 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1492 OP_SINGLE_DH_USE)
1493 if OP_SINGLE_ECDH_USE != 0:
1494 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1495 OP_SINGLE_ECDH_USE)
1496 if OP_CIPHER_SERVER_PREFERENCE != 0:
1497 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1498 OP_CIPHER_SERVER_PREFERENCE)
1499
Christian Heimes4c05b472013-11-23 15:58:30 +01001500 def test_create_default_context(self):
1501 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001502
Christian Heimesa170fa12017-09-15 20:27:30 +02001503 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001504 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001505 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001506 self._assert_context_options(ctx)
1507
Christian Heimes4c05b472013-11-23 15:58:30 +01001508 with open(SIGNING_CA) as f:
1509 cadata = f.read()
1510 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1511 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001512 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001513 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001515
1516 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001517 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001518 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001519 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001520
Christian Heimes67986f92013-11-23 22:43:47 +01001521 def test__create_stdlib_context(self):
1522 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001523 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001524 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001525 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001526 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001527
1528 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1529 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1530 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001531 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001532
1533 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001534 cert_reqs=ssl.CERT_REQUIRED,
1535 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001536 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1537 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001538 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001539 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001540
1541 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001543 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001544 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001545
Christian Heimes1aa9a752013-12-02 02:41:19 +01001546 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001547 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001548 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001549 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001550
Christian Heimese82c0342017-09-15 20:29:57 +02001551 # Auto set CERT_REQUIRED
1552 ctx.check_hostname = True
1553 self.assertTrue(ctx.check_hostname)
1554 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1555 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001556 ctx.verify_mode = ssl.CERT_REQUIRED
1557 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001558 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001559
Christian Heimese82c0342017-09-15 20:29:57 +02001560 # Changing verify_mode does not affect check_hostname
1561 ctx.check_hostname = False
1562 ctx.verify_mode = ssl.CERT_NONE
1563 ctx.check_hostname = False
1564 self.assertFalse(ctx.check_hostname)
1565 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1566 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001567 ctx.check_hostname = True
1568 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001569 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1570
1571 ctx.check_hostname = False
1572 ctx.verify_mode = ssl.CERT_OPTIONAL
1573 ctx.check_hostname = False
1574 self.assertFalse(ctx.check_hostname)
1575 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1576 # keep CERT_OPTIONAL
1577 ctx.check_hostname = True
1578 self.assertTrue(ctx.check_hostname)
1579 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001580
1581 # Cannot set CERT_NONE with check_hostname enabled
1582 with self.assertRaises(ValueError):
1583 ctx.verify_mode = ssl.CERT_NONE
1584 ctx.check_hostname = False
1585 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001586 ctx.verify_mode = ssl.CERT_NONE
1587 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001588
Christian Heimes5fe668c2016-09-12 00:01:11 +02001589 def test_context_client_server(self):
1590 # PROTOCOL_TLS_CLIENT has sane defaults
1591 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1592 self.assertTrue(ctx.check_hostname)
1593 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1594
1595 # PROTOCOL_TLS_SERVER has different but also sane defaults
1596 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1597 self.assertFalse(ctx.check_hostname)
1598 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1599
Christian Heimes4df60f12017-09-15 20:26:05 +02001600 def test_context_custom_class(self):
1601 class MySSLSocket(ssl.SSLSocket):
1602 pass
1603
1604 class MySSLObject(ssl.SSLObject):
1605 pass
1606
1607 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1608 ctx.sslsocket_class = MySSLSocket
1609 ctx.sslobject_class = MySSLObject
1610
1611 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1612 self.assertIsInstance(sock, MySSLSocket)
1613 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1614 self.assertIsInstance(obj, MySSLObject)
1615
Antoine Pitrou152efa22010-05-16 18:19:27 +00001616
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001617class SSLErrorTests(unittest.TestCase):
1618
1619 def test_str(self):
1620 # The str() of a SSLError doesn't include the errno
1621 e = ssl.SSLError(1, "foo")
1622 self.assertEqual(str(e), "foo")
1623 self.assertEqual(e.errno, 1)
1624 # Same for a subclass
1625 e = ssl.SSLZeroReturnError(1, "foo")
1626 self.assertEqual(str(e), "foo")
1627 self.assertEqual(e.errno, 1)
1628
1629 def test_lib_reason(self):
1630 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001631 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001632 with self.assertRaises(ssl.SSLError) as cm:
1633 ctx.load_dh_params(CERTFILE)
1634 self.assertEqual(cm.exception.library, 'PEM')
1635 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1636 s = str(cm.exception)
1637 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1638
1639 def test_subclass(self):
1640 # Check that the appropriate SSLError subclass is raised
1641 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001642 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1643 ctx.check_hostname = False
1644 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001645 with socket.socket() as s:
1646 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001647 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001648 c = socket.socket()
1649 c.connect(s.getsockname())
1650 c.setblocking(False)
1651 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001652 with self.assertRaises(ssl.SSLWantReadError) as cm:
1653 c.do_handshake()
1654 s = str(cm.exception)
1655 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1656 # For compatibility
1657 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1658
1659
Christian Heimes61d478c2018-01-27 15:51:38 +01001660 def test_bad_server_hostname(self):
1661 ctx = ssl.create_default_context()
1662 with self.assertRaises(ValueError):
1663 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1664 server_hostname="")
1665 with self.assertRaises(ValueError):
1666 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1667 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001668 with self.assertRaises(TypeError):
1669 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1670 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001671
1672
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001673class MemoryBIOTests(unittest.TestCase):
1674
1675 def test_read_write(self):
1676 bio = ssl.MemoryBIO()
1677 bio.write(b'foo')
1678 self.assertEqual(bio.read(), b'foo')
1679 self.assertEqual(bio.read(), b'')
1680 bio.write(b'foo')
1681 bio.write(b'bar')
1682 self.assertEqual(bio.read(), b'foobar')
1683 self.assertEqual(bio.read(), b'')
1684 bio.write(b'baz')
1685 self.assertEqual(bio.read(2), b'ba')
1686 self.assertEqual(bio.read(1), b'z')
1687 self.assertEqual(bio.read(1), b'')
1688
1689 def test_eof(self):
1690 bio = ssl.MemoryBIO()
1691 self.assertFalse(bio.eof)
1692 self.assertEqual(bio.read(), b'')
1693 self.assertFalse(bio.eof)
1694 bio.write(b'foo')
1695 self.assertFalse(bio.eof)
1696 bio.write_eof()
1697 self.assertFalse(bio.eof)
1698 self.assertEqual(bio.read(2), b'fo')
1699 self.assertFalse(bio.eof)
1700 self.assertEqual(bio.read(1), b'o')
1701 self.assertTrue(bio.eof)
1702 self.assertEqual(bio.read(), b'')
1703 self.assertTrue(bio.eof)
1704
1705 def test_pending(self):
1706 bio = ssl.MemoryBIO()
1707 self.assertEqual(bio.pending, 0)
1708 bio.write(b'foo')
1709 self.assertEqual(bio.pending, 3)
1710 for i in range(3):
1711 bio.read(1)
1712 self.assertEqual(bio.pending, 3-i-1)
1713 for i in range(3):
1714 bio.write(b'x')
1715 self.assertEqual(bio.pending, i+1)
1716 bio.read()
1717 self.assertEqual(bio.pending, 0)
1718
1719 def test_buffer_types(self):
1720 bio = ssl.MemoryBIO()
1721 bio.write(b'foo')
1722 self.assertEqual(bio.read(), b'foo')
1723 bio.write(bytearray(b'bar'))
1724 self.assertEqual(bio.read(), b'bar')
1725 bio.write(memoryview(b'baz'))
1726 self.assertEqual(bio.read(), b'baz')
1727
1728 def test_error_types(self):
1729 bio = ssl.MemoryBIO()
1730 self.assertRaises(TypeError, bio.write, 'foo')
1731 self.assertRaises(TypeError, bio.write, None)
1732 self.assertRaises(TypeError, bio.write, True)
1733 self.assertRaises(TypeError, bio.write, 1)
1734
1735
Christian Heimes89c20512018-02-27 11:17:32 +01001736class SSLObjectTests(unittest.TestCase):
1737 def test_private_init(self):
1738 bio = ssl.MemoryBIO()
1739 with self.assertRaisesRegex(TypeError, "public constructor"):
1740 ssl.SSLObject(bio, bio)
1741
Miss Islington (bot)c00f7032018-09-21 22:00:42 -07001742 def test_unwrap(self):
1743 client_ctx, server_ctx, hostname = testing_context()
1744 c_in = ssl.MemoryBIO()
1745 c_out = ssl.MemoryBIO()
1746 s_in = ssl.MemoryBIO()
1747 s_out = ssl.MemoryBIO()
1748 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1749 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1750
1751 # Loop on the handshake for a bit to get it settled
1752 for _ in range(5):
1753 try:
1754 client.do_handshake()
1755 except ssl.SSLWantReadError:
1756 pass
1757 if c_out.pending:
1758 s_in.write(c_out.read())
1759 try:
1760 server.do_handshake()
1761 except ssl.SSLWantReadError:
1762 pass
1763 if s_out.pending:
1764 c_in.write(s_out.read())
1765 # Now the handshakes should be complete (don't raise WantReadError)
1766 client.do_handshake()
1767 server.do_handshake()
1768
1769 # Now if we unwrap one side unilaterally, it should send close-notify
1770 # and raise WantReadError:
1771 with self.assertRaises(ssl.SSLWantReadError):
1772 client.unwrap()
1773
1774 # But server.unwrap() does not raise, because it reads the client's
1775 # close-notify:
1776 s_in.write(c_out.read())
1777 server.unwrap()
1778
1779 # And now that the client gets the server's close-notify, it doesn't
1780 # raise either.
1781 c_in.write(s_out.read())
1782 client.unwrap()
Christian Heimes89c20512018-02-27 11:17:32 +01001783
Martin Panter3840b2a2016-03-27 01:53:46 +00001784class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001785 """Tests that connect to a simple server running in the background"""
1786
1787 def setUp(self):
1788 server = ThreadedEchoServer(SIGNED_CERTFILE)
1789 self.server_addr = (HOST, server.port)
1790 server.__enter__()
1791 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001792
Antoine Pitrou480a1242010-04-28 21:37:09 +00001793 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001794 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001795 cert_reqs=ssl.CERT_NONE) as s:
1796 s.connect(self.server_addr)
1797 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001798 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001799
Martin Panter3840b2a2016-03-27 01:53:46 +00001800 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001801 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001802 cert_reqs=ssl.CERT_REQUIRED,
1803 ca_certs=SIGNING_CA) as s:
1804 s.connect(self.server_addr)
1805 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001806 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001807
Martin Panter3840b2a2016-03-27 01:53:46 +00001808 def test_connect_fail(self):
1809 # This should fail because we have no verification certs. Connection
1810 # failure crashes ThreadedEchoServer, so run this in an independent
1811 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001812 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001813 cert_reqs=ssl.CERT_REQUIRED)
1814 self.addCleanup(s.close)
1815 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1816 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001817
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001818 def test_connect_ex(self):
1819 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001820 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001821 cert_reqs=ssl.CERT_REQUIRED,
1822 ca_certs=SIGNING_CA)
1823 self.addCleanup(s.close)
1824 self.assertEqual(0, s.connect_ex(self.server_addr))
1825 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001826
1827 def test_non_blocking_connect_ex(self):
1828 # Issue #11326: non-blocking connect_ex() should allow handshake
1829 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001830 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001831 cert_reqs=ssl.CERT_REQUIRED,
1832 ca_certs=SIGNING_CA,
1833 do_handshake_on_connect=False)
1834 self.addCleanup(s.close)
1835 s.setblocking(False)
1836 rc = s.connect_ex(self.server_addr)
1837 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1838 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1839 # Wait for connect to finish
1840 select.select([], [s], [], 5.0)
1841 # Non-blocking handshake
1842 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001843 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001844 s.do_handshake()
1845 break
1846 except ssl.SSLWantReadError:
1847 select.select([s], [], [], 5.0)
1848 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001849 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001850 # SSL established
1851 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001852
Antoine Pitrou152efa22010-05-16 18:19:27 +00001853 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001854 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001855 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001856 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1857 s.connect(self.server_addr)
1858 self.assertEqual({}, s.getpeercert())
1859 # Same with a server hostname
1860 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1861 server_hostname="dummy") as s:
1862 s.connect(self.server_addr)
1863 ctx.verify_mode = ssl.CERT_REQUIRED
1864 # This should succeed because we specify the root cert
1865 ctx.load_verify_locations(SIGNING_CA)
1866 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1867 s.connect(self.server_addr)
1868 cert = s.getpeercert()
1869 self.assertTrue(cert)
1870
1871 def test_connect_with_context_fail(self):
1872 # This should fail because we have no verification certs. Connection
1873 # failure crashes ThreadedEchoServer, so run this in an independent
1874 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001875 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001876 ctx.verify_mode = ssl.CERT_REQUIRED
1877 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1878 self.addCleanup(s.close)
1879 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1880 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001881
1882 def test_connect_capath(self):
1883 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001884 # NOTE: the subject hashing algorithm has been changed between
1885 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1886 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001887 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001888 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001889 ctx.verify_mode = ssl.CERT_REQUIRED
1890 ctx.load_verify_locations(capath=CAPATH)
1891 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1892 s.connect(self.server_addr)
1893 cert = s.getpeercert()
1894 self.assertTrue(cert)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07001895
Martin Panter3840b2a2016-03-27 01:53:46 +00001896 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001897 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001898 ctx.verify_mode = ssl.CERT_REQUIRED
1899 ctx.load_verify_locations(capath=BYTES_CAPATH)
1900 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1901 s.connect(self.server_addr)
1902 cert = s.getpeercert()
1903 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001904
Christian Heimesefff7062013-11-21 03:35:02 +01001905 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001906 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001907 pem = f.read()
1908 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001909 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001910 ctx.verify_mode = ssl.CERT_REQUIRED
1911 ctx.load_verify_locations(cadata=pem)
1912 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1913 s.connect(self.server_addr)
1914 cert = s.getpeercert()
1915 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001916
Martin Panter3840b2a2016-03-27 01:53:46 +00001917 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001918 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001919 ctx.verify_mode = ssl.CERT_REQUIRED
1920 ctx.load_verify_locations(cadata=der)
1921 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1922 s.connect(self.server_addr)
1923 cert = s.getpeercert()
1924 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001925
Antoine Pitroue3220242010-04-24 11:13:53 +00001926 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1927 def test_makefile_close(self):
1928 # Issue #5238: creating a file-like object with makefile() shouldn't
1929 # delay closing the underlying "real socket" (here tested with its
1930 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001931 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 ss.connect(self.server_addr)
1933 fd = ss.fileno()
1934 f = ss.makefile()
1935 f.close()
1936 # The fd is still open
1937 os.read(fd, 0)
1938 # Closing the SSL socket should close the fd too
1939 ss.close()
1940 gc.collect()
1941 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001942 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001943 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001944
Antoine Pitrou480a1242010-04-28 21:37:09 +00001945 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 s = socket.socket(socket.AF_INET)
1947 s.connect(self.server_addr)
1948 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001949 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001950 cert_reqs=ssl.CERT_NONE,
1951 do_handshake_on_connect=False)
1952 self.addCleanup(s.close)
1953 count = 0
1954 while True:
1955 try:
1956 count += 1
1957 s.do_handshake()
1958 break
1959 except ssl.SSLWantReadError:
1960 select.select([s], [], [])
1961 except ssl.SSLWantWriteError:
1962 select.select([], [s], [])
1963 if support.verbose:
1964 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001965
Antoine Pitrou480a1242010-04-28 21:37:09 +00001966 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001967 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001968
Martin Panter3840b2a2016-03-27 01:53:46 +00001969 def test_get_server_certificate_fail(self):
1970 # Connection failure crashes ThreadedEchoServer, so run this in an
1971 # independent test method
1972 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001973
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001974 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001975 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001976 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1977 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001978 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001979 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1980 s.connect(self.server_addr)
1981 # Error checking can happen at instantiation or when connecting
1982 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1983 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001984 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1986 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001987
Christian Heimes9a5395a2013-06-17 15:44:12 +02001988 def test_get_ca_certs_capath(self):
1989 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001990 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 ctx.load_verify_locations(capath=CAPATH)
1992 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001993 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1994 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001995 s.connect(self.server_addr)
1996 cert = s.getpeercert()
1997 self.assertTrue(cert)
1998 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001999
Christian Heimes575596e2013-12-15 21:49:17 +01002000 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002001 def test_context_setget(self):
2002 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002003 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2004 ctx1.load_verify_locations(capath=CAPATH)
2005 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2006 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002007 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002008 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002009 ss.connect(self.server_addr)
2010 self.assertIs(ss.context, ctx1)
2011 self.assertIs(ss._sslobj.context, ctx1)
2012 ss.context = ctx2
2013 self.assertIs(ss.context, ctx2)
2014 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002015
2016 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2017 # A simple IO loop. Call func(*args) depending on the error we get
2018 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2019 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002020 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002021 count = 0
2022 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002023 if time.monotonic() > deadline:
2024 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002025 errno = None
2026 count += 1
2027 try:
2028 ret = func(*args)
2029 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002030 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002031 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002032 raise
2033 errno = e.errno
2034 # Get any data from the outgoing BIO irrespective of any error, and
2035 # send it to the socket.
2036 buf = outgoing.read()
2037 sock.sendall(buf)
2038 # If there's no error, we're done. For WANT_READ, we need to get
2039 # data from the socket and put it in the incoming BIO.
2040 if errno is None:
2041 break
2042 elif errno == ssl.SSL_ERROR_WANT_READ:
2043 buf = sock.recv(32768)
2044 if buf:
2045 incoming.write(buf)
2046 else:
2047 incoming.write_eof()
2048 if support.verbose:
2049 sys.stdout.write("Needed %d calls to complete %s().\n"
2050 % (count, func.__name__))
2051 return ret
2052
Martin Panter3840b2a2016-03-27 01:53:46 +00002053 def test_bio_handshake(self):
2054 sock = socket.socket(socket.AF_INET)
2055 self.addCleanup(sock.close)
2056 sock.connect(self.server_addr)
2057 incoming = ssl.MemoryBIO()
2058 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2060 self.assertTrue(ctx.check_hostname)
2061 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002062 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002063 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2064 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002065 self.assertIs(sslobj._sslobj.owner, sslobj)
2066 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002067 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002068 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002069 self.assertRaises(ValueError, sslobj.getpeercert)
2070 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2071 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2072 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2073 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002074 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002075 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002076 self.assertTrue(sslobj.getpeercert())
2077 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2078 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2079 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002080 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002081 except ssl.SSLSyscallError:
2082 # If the server shuts down the TCP connection without sending a
2083 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2084 pass
2085 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2086
2087 def test_bio_read_write_data(self):
2088 sock = socket.socket(socket.AF_INET)
2089 self.addCleanup(sock.close)
2090 sock.connect(self.server_addr)
2091 incoming = ssl.MemoryBIO()
2092 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002093 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002094 ctx.verify_mode = ssl.CERT_NONE
2095 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2096 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2097 req = b'FOO\n'
2098 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2099 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2100 self.assertEqual(buf, b'foo\n')
2101 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002102
2103
Martin Panter3840b2a2016-03-27 01:53:46 +00002104class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002105
Martin Panter3840b2a2016-03-27 01:53:46 +00002106 def test_timeout_connect_ex(self):
2107 # Issue #12065: on a timeout, connect_ex() should return the original
2108 # errno (mimicking the behaviour of non-SSL sockets).
2109 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002110 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002111 cert_reqs=ssl.CERT_REQUIRED,
2112 do_handshake_on_connect=False)
2113 self.addCleanup(s.close)
2114 s.settimeout(0.0000001)
2115 rc = s.connect_ex((REMOTE_HOST, 443))
2116 if rc == 0:
2117 self.skipTest("REMOTE_HOST responded too quickly")
2118 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2119
2120 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2121 def test_get_server_certificate_ipv6(self):
2122 with support.transient_internet('ipv6.google.com'):
2123 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2124 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2125
Martin Panter3840b2a2016-03-27 01:53:46 +00002126
2127def _test_get_server_certificate(test, host, port, cert=None):
2128 pem = ssl.get_server_certificate((host, port))
2129 if not pem:
2130 test.fail("No server certificate on %s:%s!" % (host, port))
2131
2132 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2133 if not pem:
2134 test.fail("No server certificate on %s:%s!" % (host, port))
2135 if support.verbose:
2136 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2137
2138def _test_get_server_certificate_fail(test, host, port):
2139 try:
2140 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2141 except ssl.SSLError as x:
2142 #should fail
2143 if support.verbose:
2144 sys.stdout.write("%s\n" % x)
2145 else:
2146 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2147
2148
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002149from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002150
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002151class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002152
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002153 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002154
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002155 """A mildly complicated class, because we want it to work both
2156 with and without the SSL wrapper around the socket connection, so
2157 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002158
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002159 def __init__(self, server, connsock, addr):
2160 self.server = server
2161 self.running = False
2162 self.sock = connsock
2163 self.addr = addr
2164 self.sock.setblocking(1)
2165 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002166 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002167 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002169 def wrap_conn(self):
2170 try:
2171 self.sslconn = self.server.context.wrap_socket(
2172 self.sock, server_side=True)
2173 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2174 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002175 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002176 # We treat ConnectionResetError as though it were an
2177 # SSLError - OpenSSL on Ubuntu abruptly closes the
2178 # connection when asked to use an unsupported protocol.
2179 #
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002180 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2181 # tries to send session tickets after handshake.
2182 # https://github.com/openssl/openssl/issues/6342
2183 self.server.conn_errors.append(str(e))
2184 if self.server.chatty:
2185 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2186 self.running = False
2187 self.close()
2188 return False
2189 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002190 # OSError may occur with wrong protocols, e.g. both
2191 # sides use PROTOCOL_TLS_SERVER.
2192 #
2193 # XXX Various errors can have happened here, for example
2194 # a mismatching protocol version, an invalid certificate,
2195 # or a low-level bug. This should be made more discriminating.
2196 #
2197 # bpo-31323: Store the exception as string to prevent
2198 # a reference leak: server -> conn_errors -> exception
2199 # -> traceback -> self (ConnectionHandler) -> server
2200 self.server.conn_errors.append(str(e))
2201 if self.server.chatty:
2202 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2203 self.running = False
2204 self.server.stop()
2205 self.close()
2206 return False
2207 else:
2208 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2209 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2210 cert = self.sslconn.getpeercert()
2211 if support.verbose and self.server.chatty:
2212 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2213 cert_binary = self.sslconn.getpeercert(True)
2214 if support.verbose and self.server.chatty:
2215 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2216 cipher = self.sslconn.cipher()
2217 if support.verbose and self.server.chatty:
2218 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2219 sys.stdout.write(" server: selected protocol is now "
2220 + str(self.sslconn.selected_npn_protocol()) + "\n")
2221 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002222
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002223 def read(self):
2224 if self.sslconn:
2225 return self.sslconn.read()
2226 else:
2227 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002228
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002229 def write(self, bytes):
2230 if self.sslconn:
2231 return self.sslconn.write(bytes)
2232 else:
2233 return self.sock.send(bytes)
2234
2235 def close(self):
2236 if self.sslconn:
2237 self.sslconn.close()
2238 else:
2239 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002240
Antoine Pitrou480a1242010-04-28 21:37:09 +00002241 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002242 self.running = True
2243 if not self.server.starttls_server:
2244 if not self.wrap_conn():
2245 return
2246 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002247 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002248 msg = self.read()
2249 stripped = msg.strip()
2250 if not stripped:
2251 # eof, so quit this handler
2252 self.running = False
2253 try:
2254 self.sock = self.sslconn.unwrap()
2255 except OSError:
2256 # Many tests shut the TCP connection down
2257 # without an SSL shutdown. This causes
2258 # unwrap() to raise OSError with errno=0!
2259 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002260 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002261 self.sslconn = None
2262 self.close()
2263 elif stripped == b'over':
2264 if support.verbose and self.server.connectionchatty:
2265 sys.stdout.write(" server: client closed connection\n")
2266 self.close()
2267 return
2268 elif (self.server.starttls_server and
2269 stripped == b'STARTTLS'):
2270 if support.verbose and self.server.connectionchatty:
2271 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2272 self.write(b"OK\n")
2273 if not self.wrap_conn():
2274 return
2275 elif (self.server.starttls_server and self.sslconn
2276 and stripped == b'ENDTLS'):
2277 if support.verbose and self.server.connectionchatty:
2278 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2279 self.write(b"OK\n")
2280 self.sock = self.sslconn.unwrap()
2281 self.sslconn = None
2282 if support.verbose and self.server.connectionchatty:
2283 sys.stdout.write(" server: connection is now unencrypted...\n")
2284 elif stripped == b'CB tls-unique':
2285 if support.verbose and self.server.connectionchatty:
2286 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2287 data = self.sslconn.get_channel_binding("tls-unique")
2288 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes2756ef32018-09-23 09:22:52 +02002289 elif stripped == b'PHA':
2290 if support.verbose and self.server.connectionchatty:
2291 sys.stdout.write(" server: initiating post handshake auth\n")
2292 try:
2293 self.sslconn.verify_client_post_handshake()
2294 except ssl.SSLError as e:
2295 self.write(repr(e).encode("us-ascii") + b"\n")
2296 else:
2297 self.write(b"OK\n")
2298 elif stripped == b'HASCERT':
2299 if self.sslconn.getpeercert() is not None:
2300 self.write(b'TRUE\n')
2301 else:
2302 self.write(b'FALSE\n')
2303 elif stripped == b'GETCERT':
2304 cert = self.sslconn.getpeercert()
2305 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002306 else:
2307 if (support.verbose and
2308 self.server.connectionchatty):
2309 ctype = (self.sslconn and "encrypted") or "unencrypted"
2310 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2311 % (msg, ctype, msg.lower(), ctype))
2312 self.write(msg.lower())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002313 except ConnectionResetError:
2314 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2315 # when connection is not shut down gracefully.
2316 if self.server.chatty and support.verbose:
2317 sys.stdout.write(
2318 " Connection reset by peer: {}\n".format(
2319 self.addr)
2320 )
2321 self.close()
2322 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002323 except OSError:
2324 if self.server.chatty:
2325 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002326 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002327 self.running = False
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002328
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002329 # normally, we'd just stop here, but for the test
2330 # harness, we want to stop the server
2331 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002332
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002333 def __init__(self, certificate=None, ssl_version=None,
2334 certreqs=None, cacerts=None,
2335 chatty=True, connectionchatty=False, starttls_server=False,
2336 npn_protocols=None, alpn_protocols=None,
2337 ciphers=None, context=None):
2338 if context:
2339 self.context = context
2340 else:
2341 self.context = ssl.SSLContext(ssl_version
2342 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002343 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002344 self.context.verify_mode = (certreqs if certreqs is not None
2345 else ssl.CERT_NONE)
2346 if cacerts:
2347 self.context.load_verify_locations(cacerts)
2348 if certificate:
2349 self.context.load_cert_chain(certificate)
2350 if npn_protocols:
2351 self.context.set_npn_protocols(npn_protocols)
2352 if alpn_protocols:
2353 self.context.set_alpn_protocols(alpn_protocols)
2354 if ciphers:
2355 self.context.set_ciphers(ciphers)
2356 self.chatty = chatty
2357 self.connectionchatty = connectionchatty
2358 self.starttls_server = starttls_server
2359 self.sock = socket.socket()
2360 self.port = support.bind_port(self.sock)
2361 self.flag = None
2362 self.active = False
2363 self.selected_npn_protocols = []
2364 self.selected_alpn_protocols = []
2365 self.shared_ciphers = []
2366 self.conn_errors = []
2367 threading.Thread.__init__(self)
2368 self.daemon = True
2369
2370 def __enter__(self):
2371 self.start(threading.Event())
2372 self.flag.wait()
2373 return self
2374
2375 def __exit__(self, *args):
2376 self.stop()
2377 self.join()
2378
2379 def start(self, flag=None):
2380 self.flag = flag
2381 threading.Thread.start(self)
2382
2383 def run(self):
2384 self.sock.settimeout(0.05)
2385 self.sock.listen()
2386 self.active = True
2387 if self.flag:
2388 # signal an event
2389 self.flag.set()
2390 while self.active:
2391 try:
2392 newconn, connaddr = self.sock.accept()
2393 if support.verbose and self.chatty:
2394 sys.stdout.write(' server: new connection from '
2395 + repr(connaddr) + '\n')
2396 handler = self.ConnectionHandler(self, newconn, connaddr)
2397 handler.start()
2398 handler.join()
2399 except socket.timeout:
2400 pass
2401 except KeyboardInterrupt:
2402 self.stop()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002403 except BaseException as e:
2404 if support.verbose and self.chatty:
2405 sys.stdout.write(
2406 ' connection handling failed: ' + repr(e) + '\n')
2407
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002408 self.sock.close()
2409
2410 def stop(self):
2411 self.active = False
2412
2413class AsyncoreEchoServer(threading.Thread):
2414
2415 # this one's based on asyncore.dispatcher
2416
2417 class EchoServer (asyncore.dispatcher):
2418
2419 class ConnectionHandler(asyncore.dispatcher_with_send):
2420
2421 def __init__(self, conn, certfile):
2422 self.socket = test_wrap_socket(conn, server_side=True,
2423 certfile=certfile,
2424 do_handshake_on_connect=False)
2425 asyncore.dispatcher_with_send.__init__(self, self.socket)
2426 self._ssl_accepting = True
2427 self._do_ssl_handshake()
2428
2429 def readable(self):
2430 if isinstance(self.socket, ssl.SSLSocket):
2431 while self.socket.pending() > 0:
2432 self.handle_read_event()
2433 return True
2434
2435 def _do_ssl_handshake(self):
2436 try:
2437 self.socket.do_handshake()
2438 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2439 return
2440 except ssl.SSLEOFError:
2441 return self.handle_close()
2442 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002443 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002444 except OSError as err:
2445 if err.args[0] == errno.ECONNABORTED:
2446 return self.handle_close()
2447 else:
2448 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002449
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002450 def handle_read(self):
2451 if self._ssl_accepting:
2452 self._do_ssl_handshake()
2453 else:
2454 data = self.recv(1024)
2455 if support.verbose:
2456 sys.stdout.write(" server: read %s from client\n" % repr(data))
2457 if not data:
2458 self.close()
2459 else:
2460 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002461
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002462 def handle_close(self):
2463 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002464 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002465 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002466
2467 def handle_error(self):
2468 raise
2469
Trent Nelson78520002008-04-10 20:54:35 +00002470 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 self.certfile = certfile
2472 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2473 self.port = support.bind_port(sock, '')
2474 asyncore.dispatcher.__init__(self, sock)
2475 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002476
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002477 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002478 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002479 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2480 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002481
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002482 def handle_error(self):
2483 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002484
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002485 def __init__(self, certfile):
2486 self.flag = None
2487 self.active = False
2488 self.server = self.EchoServer(certfile)
2489 self.port = self.server.port
2490 threading.Thread.__init__(self)
2491 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002492
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002493 def __str__(self):
2494 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002495
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002496 def __enter__(self):
2497 self.start(threading.Event())
2498 self.flag.wait()
2499 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002500
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002501 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002502 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002503 sys.stdout.write(" cleanup: stopping server.\n")
2504 self.stop()
2505 if support.verbose:
2506 sys.stdout.write(" cleanup: joining server thread.\n")
2507 self.join()
2508 if support.verbose:
2509 sys.stdout.write(" cleanup: successfully joined.\n")
2510 # make sure that ConnectionHandler is removed from socket_map
2511 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002512
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002513 def start (self, flag=None):
2514 self.flag = flag
2515 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002516
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002517 def run(self):
2518 self.active = True
2519 if self.flag:
2520 self.flag.set()
2521 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002522 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002523 asyncore.loop(1)
2524 except:
2525 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002526
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 def stop(self):
2528 self.active = False
2529 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002530
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002531def server_params_test(client_context, server_context, indata=b"FOO\n",
2532 chatty=True, connectionchatty=False, sni_name=None,
2533 session=None):
2534 """
2535 Launch a server, connect a client to it and try various reads
2536 and writes.
2537 """
2538 stats = {}
2539 server = ThreadedEchoServer(context=server_context,
2540 chatty=chatty,
2541 connectionchatty=False)
2542 with server:
2543 with client_context.wrap_socket(socket.socket(),
2544 server_hostname=sni_name, session=session) as s:
2545 s.connect((HOST, server.port))
2546 for arg in [indata, bytearray(indata), memoryview(indata)]:
2547 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002548 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002549 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002550 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002551 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002552 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002553 if connectionchatty:
2554 if support.verbose:
2555 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002556 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002558 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2559 % (outdata[:20], len(outdata),
2560 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002561 s.write(b"over\n")
2562 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002563 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002564 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002565 stats.update({
2566 'compression': s.compression(),
2567 'cipher': s.cipher(),
2568 'peercert': s.getpeercert(),
2569 'client_alpn_protocol': s.selected_alpn_protocol(),
2570 'client_npn_protocol': s.selected_npn_protocol(),
2571 'version': s.version(),
2572 'session_reused': s.session_reused,
2573 'session': s.session,
2574 })
2575 s.close()
2576 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2577 stats['server_npn_protocols'] = server.selected_npn_protocols
2578 stats['server_shared_ciphers'] = server.shared_ciphers
2579 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002580
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002581def try_protocol_combo(server_protocol, client_protocol, expect_success,
2582 certsreqs=None, server_options=0, client_options=0):
2583 """
2584 Try to SSL-connect using *client_protocol* to *server_protocol*.
2585 If *expect_success* is true, assert that the connection succeeds,
2586 if it's false, assert that the connection fails.
2587 Also, if *expect_success* is a string, assert that it is the protocol
2588 version actually used by the connection.
2589 """
2590 if certsreqs is None:
2591 certsreqs = ssl.CERT_NONE
2592 certtype = {
2593 ssl.CERT_NONE: "CERT_NONE",
2594 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2595 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2596 }[certsreqs]
2597 if support.verbose:
2598 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2599 sys.stdout.write(formatstr %
2600 (ssl.get_protocol_name(client_protocol),
2601 ssl.get_protocol_name(server_protocol),
2602 certtype))
2603 client_context = ssl.SSLContext(client_protocol)
2604 client_context.options |= client_options
2605 server_context = ssl.SSLContext(server_protocol)
2606 server_context.options |= server_options
2607
2608 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2609 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2610 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002611 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002612 client_context.set_ciphers("ALL")
2613
2614 for ctx in (client_context, server_context):
2615 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002616 ctx.load_cert_chain(SIGNED_CERTFILE)
2617 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002618 try:
2619 stats = server_params_test(client_context, server_context,
2620 chatty=False, connectionchatty=False)
2621 # Protocol mismatch can result in either an SSLError, or a
2622 # "Connection reset by peer" error.
2623 except ssl.SSLError:
2624 if expect_success:
2625 raise
2626 except OSError as e:
2627 if expect_success or e.errno != errno.ECONNRESET:
2628 raise
2629 else:
2630 if not expect_success:
2631 raise AssertionError(
2632 "Client protocol %s succeeded with server protocol %s!"
2633 % (ssl.get_protocol_name(client_protocol),
2634 ssl.get_protocol_name(server_protocol)))
2635 elif (expect_success is not True
2636 and expect_success != stats['version']):
2637 raise AssertionError("version mismatch: expected %r, got %r"
2638 % (expect_success, stats['version']))
2639
2640
2641class ThreadedTests(unittest.TestCase):
2642
2643 @skip_if_broken_ubuntu_ssl
2644 def test_echo(self):
2645 """Basic test of an SSL client connecting to a server"""
2646 if support.verbose:
2647 sys.stdout.write("\n")
2648 for protocol in PROTOCOLS:
2649 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2650 continue
2651 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2652 context = ssl.SSLContext(protocol)
2653 context.load_cert_chain(CERTFILE)
2654 server_params_test(context, context,
2655 chatty=True, connectionchatty=True)
2656
Christian Heimesa170fa12017-09-15 20:27:30 +02002657 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002658
2659 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2660 server_params_test(client_context=client_context,
2661 server_context=server_context,
2662 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002663 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002664
2665 client_context.check_hostname = False
2666 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2667 with self.assertRaises(ssl.SSLError) as e:
2668 server_params_test(client_context=server_context,
2669 server_context=client_context,
2670 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002671 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672 self.assertIn('called a function you should not call',
2673 str(e.exception))
2674
2675 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2676 with self.assertRaises(ssl.SSLError) as e:
2677 server_params_test(client_context=server_context,
2678 server_context=server_context,
2679 chatty=True, connectionchatty=True)
2680 self.assertIn('called a function you should not call',
2681 str(e.exception))
2682
2683 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2684 with self.assertRaises(ssl.SSLError) as e:
2685 server_params_test(client_context=server_context,
2686 server_context=client_context,
2687 chatty=True, connectionchatty=True)
2688 self.assertIn('called a function you should not call',
2689 str(e.exception))
2690
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002691 def test_getpeercert(self):
2692 if support.verbose:
2693 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002694
2695 client_context, server_context, hostname = testing_context()
2696 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002697 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002698 with client_context.wrap_socket(socket.socket(),
2699 do_handshake_on_connect=False,
2700 server_hostname=hostname) as s:
2701 s.connect((HOST, server.port))
2702 # getpeercert() raise ValueError while the handshake isn't
2703 # done.
2704 with self.assertRaises(ValueError):
2705 s.getpeercert()
2706 s.do_handshake()
2707 cert = s.getpeercert()
2708 self.assertTrue(cert, "Can't get peer certificate.")
2709 cipher = s.cipher()
2710 if support.verbose:
2711 sys.stdout.write(pprint.pformat(cert) + '\n')
2712 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2713 if 'subject' not in cert:
2714 self.fail("No subject field in certificate: %s." %
2715 pprint.pformat(cert))
2716 if ((('organizationName', 'Python Software Foundation'),)
2717 not in cert['subject']):
2718 self.fail(
2719 "Missing or invalid 'organizationName' field in certificate subject; "
2720 "should be 'Python Software Foundation'.")
2721 self.assertIn('notBefore', cert)
2722 self.assertIn('notAfter', cert)
2723 before = ssl.cert_time_to_seconds(cert['notBefore'])
2724 after = ssl.cert_time_to_seconds(cert['notAfter'])
2725 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002726
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002727 @unittest.skipUnless(have_verify_flags(),
2728 "verify_flags need OpenSSL > 0.9.8")
2729 def test_crl_check(self):
2730 if support.verbose:
2731 sys.stdout.write("\n")
2732
Christian Heimesa170fa12017-09-15 20:27:30 +02002733 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002734
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002735 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002736 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002737
2738 # VERIFY_DEFAULT should pass
2739 server = ThreadedEchoServer(context=server_context, chatty=True)
2740 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002741 with client_context.wrap_socket(socket.socket(),
2742 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002743 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002744 cert = s.getpeercert()
2745 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002746
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002747 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002748 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002749
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002750 server = ThreadedEchoServer(context=server_context, chatty=True)
2751 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002752 with client_context.wrap_socket(socket.socket(),
2753 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002754 with self.assertRaisesRegex(ssl.SSLError,
2755 "certificate verify failed"):
2756 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002757
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002758 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002759 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002760
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002761 server = ThreadedEchoServer(context=server_context, chatty=True)
2762 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002763 with client_context.wrap_socket(socket.socket(),
2764 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002765 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002766 cert = s.getpeercert()
2767 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002768
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002769 def test_check_hostname(self):
2770 if support.verbose:
2771 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002772
Christian Heimesa170fa12017-09-15 20:27:30 +02002773 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002774
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002775 # correct hostname should verify
2776 server = ThreadedEchoServer(context=server_context, chatty=True)
2777 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002778 with client_context.wrap_socket(socket.socket(),
2779 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780 s.connect((HOST, server.port))
2781 cert = s.getpeercert()
2782 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002783
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002784 # incorrect hostname should raise an exception
2785 server = ThreadedEchoServer(context=server_context, chatty=True)
2786 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002787 with client_context.wrap_socket(socket.socket(),
2788 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002789 with self.assertRaisesRegex(
2790 ssl.CertificateError,
2791 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002792 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002793
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002794 # missing server_hostname arg should cause an exception, too
2795 server = ThreadedEchoServer(context=server_context, chatty=True)
2796 with server:
2797 with socket.socket() as s:
2798 with self.assertRaisesRegex(ValueError,
2799 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002800 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002801
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002802 def test_ecc_cert(self):
2803 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2804 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002805 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002806 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2807
2808 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2809 # load ECC cert
2810 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2811
2812 # correct hostname should verify
2813 server = ThreadedEchoServer(context=server_context, chatty=True)
2814 with server:
2815 with client_context.wrap_socket(socket.socket(),
2816 server_hostname=hostname) as s:
2817 s.connect((HOST, server.port))
2818 cert = s.getpeercert()
2819 self.assertTrue(cert, "Can't get peer certificate.")
2820 cipher = s.cipher()[0].split('-')
2821 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2822
2823 def test_dual_rsa_ecc(self):
2824 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2825 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002826 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2827 # algorithms.
2828 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002829 # only ECDSA certs
2830 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2831 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2832
2833 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2834 # load ECC and RSA key/cert pairs
2835 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2836 server_context.load_cert_chain(SIGNED_CERTFILE)
2837
2838 # correct hostname should verify
2839 server = ThreadedEchoServer(context=server_context, chatty=True)
2840 with server:
2841 with client_context.wrap_socket(socket.socket(),
2842 server_hostname=hostname) as s:
2843 s.connect((HOST, server.port))
2844 cert = s.getpeercert()
2845 self.assertTrue(cert, "Can't get peer certificate.")
2846 cipher = s.cipher()[0].split('-')
2847 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2848
Christian Heimes66e57422018-01-29 14:25:13 +01002849 def test_check_hostname_idn(self):
2850 if support.verbose:
2851 sys.stdout.write("\n")
2852
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002853 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002854 server_context.load_cert_chain(IDNSANSFILE)
2855
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002856 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002857 context.verify_mode = ssl.CERT_REQUIRED
2858 context.check_hostname = True
2859 context.load_verify_locations(SIGNING_CA)
2860
2861 # correct hostname should verify, when specified in several
2862 # different ways
2863 idn_hostnames = [
2864 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002865 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002866 ('xn--knig-5qa.idn.pythontest.net',
2867 'xn--knig-5qa.idn.pythontest.net'),
2868 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002869 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002870
2871 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002872 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002873 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2874 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2875 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002876 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2877
2878 # ('königsgäßchen.idna2008.pythontest.net',
2879 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2880 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2881 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2882 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2883 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2884
Christian Heimes66e57422018-01-29 14:25:13 +01002885 ]
2886 for server_hostname, expected_hostname in idn_hostnames:
2887 server = ThreadedEchoServer(context=server_context, chatty=True)
2888 with server:
2889 with context.wrap_socket(socket.socket(),
2890 server_hostname=server_hostname) as s:
2891 self.assertEqual(s.server_hostname, expected_hostname)
2892 s.connect((HOST, server.port))
2893 cert = s.getpeercert()
2894 self.assertEqual(s.server_hostname, expected_hostname)
2895 self.assertTrue(cert, "Can't get peer certificate.")
2896
Christian Heimes66e57422018-01-29 14:25:13 +01002897 # incorrect hostname should raise an exception
2898 server = ThreadedEchoServer(context=server_context, chatty=True)
2899 with server:
2900 with context.wrap_socket(socket.socket(),
2901 server_hostname="python.example.org") as s:
2902 with self.assertRaises(ssl.CertificateError):
2903 s.connect((HOST, server.port))
2904
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002905 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002906 """Connecting when the server rejects the client's certificate
2907
2908 Launch a server with CERT_REQUIRED, and check that trying to
2909 connect to it with a wrong client certificate fails.
2910 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002911 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002912 # load client cert that is not signed by trusted CA
2913 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002914 # require TLS client authentication
2915 server_context.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002916 # TLS 1.3 has different handshake
2917 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002918
2919 server = ThreadedEchoServer(
2920 context=server_context, chatty=True, connectionchatty=True,
2921 )
2922
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002923 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002924 client_context.wrap_socket(socket.socket(),
2925 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002926 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002927 # Expect either an SSL error about the server rejecting
2928 # the connection, or a low-level connection reset (which
2929 # sometimes happens on Windows)
2930 s.connect((HOST, server.port))
2931 except ssl.SSLError as e:
2932 if support.verbose:
2933 sys.stdout.write("\nSSLError is %r\n" % e)
2934 except OSError as e:
2935 if e.errno != errno.ECONNRESET:
2936 raise
2937 if support.verbose:
2938 sys.stdout.write("\nsocket.error is %r\n" % e)
2939 else:
2940 self.fail("Use of invalid cert should have failed!")
2941
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002942 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2943 def test_wrong_cert_tls13(self):
2944 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002945 # load client cert that is not signed by trusted CA
2946 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002947 server_context.verify_mode = ssl.CERT_REQUIRED
2948 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2949 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2950
2951 server = ThreadedEchoServer(
2952 context=server_context, chatty=True, connectionchatty=True,
2953 )
2954 with server, \
2955 client_context.wrap_socket(socket.socket(),
2956 server_hostname=hostname) as s:
2957 # TLS 1.3 perform client cert exchange after handshake
2958 s.connect((HOST, server.port))
2959 try:
2960 s.write(b'data')
2961 s.read(4)
2962 except ssl.SSLError as e:
2963 if support.verbose:
2964 sys.stdout.write("\nSSLError is %r\n" % e)
2965 except OSError as e:
2966 if e.errno != errno.ECONNRESET:
2967 raise
2968 if support.verbose:
2969 sys.stdout.write("\nsocket.error is %r\n" % e)
2970 else:
2971 self.fail("Use of invalid cert should have failed!")
2972
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 def test_rude_shutdown(self):
2974 """A brutal shutdown of an SSL server should raise an OSError
2975 in the client when attempting handshake.
2976 """
2977 listener_ready = threading.Event()
2978 listener_gone = threading.Event()
2979
2980 s = socket.socket()
2981 port = support.bind_port(s, HOST)
2982
2983 # `listener` runs in a thread. It sits in an accept() until
2984 # the main thread connects. Then it rudely closes the socket,
2985 # and sets Event `listener_gone` to let the main thread know
2986 # the socket is gone.
2987 def listener():
2988 s.listen()
2989 listener_ready.set()
2990 newsock, addr = s.accept()
2991 newsock.close()
2992 s.close()
2993 listener_gone.set()
2994
2995 def connector():
2996 listener_ready.wait()
2997 with socket.socket() as c:
2998 c.connect((HOST, port))
2999 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003000 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003001 ssl_sock = test_wrap_socket(c)
3002 except OSError:
3003 pass
3004 else:
3005 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003006
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003007 t = threading.Thread(target=listener)
3008 t.start()
3009 try:
3010 connector()
3011 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003012 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003013
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003014 def test_ssl_cert_verify_error(self):
3015 if support.verbose:
3016 sys.stdout.write("\n")
3017
3018 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3019 server_context.load_cert_chain(SIGNED_CERTFILE)
3020
3021 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3022
3023 server = ThreadedEchoServer(context=server_context, chatty=True)
3024 with server:
3025 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003026 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003027 try:
3028 s.connect((HOST, server.port))
3029 except ssl.SSLError as e:
3030 msg = 'unable to get local issuer certificate'
3031 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3032 self.assertEqual(e.verify_code, 20)
3033 self.assertEqual(e.verify_message, msg)
3034 self.assertIn(msg, repr(e))
3035 self.assertIn('certificate verify failed', repr(e))
3036
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003037 @skip_if_broken_ubuntu_ssl
3038 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3039 "OpenSSL is compiled without SSLv2 support")
3040 def test_protocol_sslv2(self):
3041 """Connecting to an SSLv2 server with various client options"""
3042 if support.verbose:
3043 sys.stdout.write("\n")
3044 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3045 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3046 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003047 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003048 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3049 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3050 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3051 # SSLv23 client with specific SSL options
3052 if no_sslv2_implies_sslv3_hello():
3053 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003054 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003055 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003056 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003057 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003058 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003059 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003060
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003061 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02003062 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003063 """Connecting to an SSLv23 server with various client options"""
3064 if support.verbose:
3065 sys.stdout.write("\n")
3066 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003067 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003068 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003069 except OSError as x:
3070 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3071 if support.verbose:
3072 sys.stdout.write(
3073 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3074 % str(x))
3075 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003076 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3077 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3078 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003079
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003080 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003081 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3082 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3083 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003084
3085 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003086 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3087 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3088 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089
3090 # Server with specific SSL options
3091 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003092 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 server_options=ssl.OP_NO_SSLv3)
3094 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003095 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003096 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003097 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003098 server_options=ssl.OP_NO_TLSv1)
3099
3100
3101 @skip_if_broken_ubuntu_ssl
3102 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3103 "OpenSSL is compiled without SSLv3 support")
3104 def test_protocol_sslv3(self):
3105 """Connecting to an SSLv3 server with various client options"""
3106 if support.verbose:
3107 sys.stdout.write("\n")
3108 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3109 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3110 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3111 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3112 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003113 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003114 client_options=ssl.OP_NO_SSLv3)
3115 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3116 if no_sslv2_implies_sslv3_hello():
3117 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003118 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003119 False, client_options=ssl.OP_NO_SSLv2)
3120
3121 @skip_if_broken_ubuntu_ssl
3122 def test_protocol_tlsv1(self):
3123 """Connecting to a TLSv1 server with various client options"""
3124 if support.verbose:
3125 sys.stdout.write("\n")
3126 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3127 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3128 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3129 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3130 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3131 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3132 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003133 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003134 client_options=ssl.OP_NO_TLSv1)
3135
3136 @skip_if_broken_ubuntu_ssl
3137 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3138 "TLS version 1.1 not supported.")
3139 def test_protocol_tlsv1_1(self):
3140 """Connecting to a TLSv1.1 server with various client options.
3141 Testing against older TLS versions."""
3142 if support.verbose:
3143 sys.stdout.write("\n")
3144 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3145 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3146 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3147 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3148 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003149 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003150 client_options=ssl.OP_NO_TLSv1_1)
3151
Christian Heimesa170fa12017-09-15 20:27:30 +02003152 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003153 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3154 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3155
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003156 @skip_if_broken_ubuntu_ssl
3157 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3158 "TLS version 1.2 not supported.")
3159 def test_protocol_tlsv1_2(self):
3160 """Connecting to a TLSv1.2 server with various client options.
3161 Testing against older TLS versions."""
3162 if support.verbose:
3163 sys.stdout.write("\n")
3164 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3165 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3166 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3167 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3168 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3169 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3170 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003171 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003172 client_options=ssl.OP_NO_TLSv1_2)
3173
Christian Heimesa170fa12017-09-15 20:27:30 +02003174 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003175 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3176 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3177 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3178 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3179
3180 def test_starttls(self):
3181 """Switching from clear text to encrypted and back again."""
3182 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3183
3184 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003185 starttls_server=True,
3186 chatty=True,
3187 connectionchatty=True)
3188 wrapped = False
3189 with server:
3190 s = socket.socket()
3191 s.setblocking(1)
3192 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003193 if support.verbose:
3194 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003195 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003196 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003197 sys.stdout.write(
3198 " client: sending %r...\n" % indata)
3199 if wrapped:
3200 conn.write(indata)
3201 outdata = conn.read()
3202 else:
3203 s.send(indata)
3204 outdata = s.recv(1024)
3205 msg = outdata.strip().lower()
3206 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3207 # STARTTLS ok, switch to secure mode
3208 if support.verbose:
3209 sys.stdout.write(
3210 " client: read %r from server, starting TLS...\n"
3211 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003212 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003213 wrapped = True
3214 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3215 # ENDTLS ok, switch back to clear text
3216 if support.verbose:
3217 sys.stdout.write(
3218 " client: read %r from server, ending TLS...\n"
3219 % msg)
3220 s = conn.unwrap()
3221 wrapped = False
3222 else:
3223 if support.verbose:
3224 sys.stdout.write(
3225 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003226 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003227 sys.stdout.write(" client: closing connection.\n")
3228 if wrapped:
3229 conn.write(b"over\n")
3230 else:
3231 s.send(b"over\n")
3232 if wrapped:
3233 conn.close()
3234 else:
3235 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003236
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003237 def test_socketserver(self):
3238 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003239 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003240 # try to connect
3241 if support.verbose:
3242 sys.stdout.write('\n')
3243 with open(CERTFILE, 'rb') as f:
3244 d1 = f.read()
3245 d2 = ''
3246 # now fetch the same data from the HTTPS server
3247 url = 'https://localhost:%d/%s' % (
3248 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003249 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003250 f = urllib.request.urlopen(url, context=context)
3251 try:
3252 dlen = f.info().get("content-length")
3253 if dlen and (int(dlen) > 0):
3254 d2 = f.read(int(dlen))
3255 if support.verbose:
3256 sys.stdout.write(
3257 " client: read %d bytes from remote server '%s'\n"
3258 % (len(d2), server))
3259 finally:
3260 f.close()
3261 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003262
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003263 def test_asyncore_server(self):
3264 """Check the example asyncore integration."""
3265 if support.verbose:
3266 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003267
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003268 indata = b"FOO\n"
3269 server = AsyncoreEchoServer(CERTFILE)
3270 with server:
3271 s = test_wrap_socket(socket.socket())
3272 s.connect(('127.0.0.1', server.port))
3273 if support.verbose:
3274 sys.stdout.write(
3275 " client: sending %r...\n" % indata)
3276 s.write(indata)
3277 outdata = s.read()
3278 if support.verbose:
3279 sys.stdout.write(" client: read %r\n" % outdata)
3280 if outdata != indata.lower():
3281 self.fail(
3282 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3283 % (outdata[:20], len(outdata),
3284 indata[:20].lower(), len(indata)))
3285 s.write(b"over\n")
3286 if support.verbose:
3287 sys.stdout.write(" client: closing connection.\n")
3288 s.close()
3289 if support.verbose:
3290 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003291
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003292 def test_recv_send(self):
3293 """Test recv(), send() and friends."""
3294 if support.verbose:
3295 sys.stdout.write("\n")
3296
3297 server = ThreadedEchoServer(CERTFILE,
3298 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003299 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003300 cacerts=CERTFILE,
3301 chatty=True,
3302 connectionchatty=False)
3303 with server:
3304 s = test_wrap_socket(socket.socket(),
3305 server_side=False,
3306 certfile=CERTFILE,
3307 ca_certs=CERTFILE,
3308 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003309 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003310 s.connect((HOST, server.port))
3311 # helper methods for standardising recv* method signatures
3312 def _recv_into():
3313 b = bytearray(b"\0"*100)
3314 count = s.recv_into(b)
3315 return b[:count]
3316
3317 def _recvfrom_into():
3318 b = bytearray(b"\0"*100)
3319 count, addr = s.recvfrom_into(b)
3320 return b[:count]
3321
3322 # (name, method, expect success?, *args, return value func)
3323 send_methods = [
3324 ('send', s.send, True, [], len),
3325 ('sendto', s.sendto, False, ["some.address"], len),
3326 ('sendall', s.sendall, True, [], lambda x: None),
3327 ]
3328 # (name, method, whether to expect success, *args)
3329 recv_methods = [
3330 ('recv', s.recv, True, []),
3331 ('recvfrom', s.recvfrom, False, ["some.address"]),
3332 ('recv_into', _recv_into, True, []),
3333 ('recvfrom_into', _recvfrom_into, False, []),
3334 ]
3335 data_prefix = "PREFIX_"
3336
3337 for (meth_name, send_meth, expect_success, args,
3338 ret_val_meth) in send_methods:
3339 indata = (data_prefix + meth_name).encode('ascii')
3340 try:
3341 ret = send_meth(indata, *args)
3342 msg = "sending with {}".format(meth_name)
3343 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3344 outdata = s.read()
3345 if outdata != indata.lower():
3346 self.fail(
3347 "While sending with <<{name:s}>> bad data "
3348 "<<{outdata:r}>> ({nout:d}) received; "
3349 "expected <<{indata:r}>> ({nin:d})\n".format(
3350 name=meth_name, outdata=outdata[:20],
3351 nout=len(outdata),
3352 indata=indata[:20], nin=len(indata)
3353 )
3354 )
3355 except ValueError as e:
3356 if expect_success:
3357 self.fail(
3358 "Failed to send with method <<{name:s}>>; "
3359 "expected to succeed.\n".format(name=meth_name)
3360 )
3361 if not str(e).startswith(meth_name):
3362 self.fail(
3363 "Method <<{name:s}>> failed with unexpected "
3364 "exception message: {exp:s}\n".format(
3365 name=meth_name, exp=e
3366 )
3367 )
3368
3369 for meth_name, recv_meth, expect_success, args in recv_methods:
3370 indata = (data_prefix + meth_name).encode('ascii')
3371 try:
3372 s.send(indata)
3373 outdata = recv_meth(*args)
3374 if outdata != indata.lower():
3375 self.fail(
3376 "While receiving with <<{name:s}>> bad data "
3377 "<<{outdata:r}>> ({nout:d}) received; "
3378 "expected <<{indata:r}>> ({nin:d})\n".format(
3379 name=meth_name, outdata=outdata[:20],
3380 nout=len(outdata),
3381 indata=indata[:20], nin=len(indata)
3382 )
3383 )
3384 except ValueError as e:
3385 if expect_success:
3386 self.fail(
3387 "Failed to receive with method <<{name:s}>>; "
3388 "expected to succeed.\n".format(name=meth_name)
3389 )
3390 if not str(e).startswith(meth_name):
3391 self.fail(
3392 "Method <<{name:s}>> failed with unexpected "
3393 "exception message: {exp:s}\n".format(
3394 name=meth_name, exp=e
3395 )
3396 )
3397 # consume data
3398 s.read()
3399
3400 # read(-1, buffer) is supported, even though read(-1) is not
3401 data = b"data"
3402 s.send(data)
3403 buffer = bytearray(len(data))
3404 self.assertEqual(s.read(-1, buffer), len(data))
3405 self.assertEqual(buffer, data)
3406
Christian Heimes888bbdc2017-09-07 14:18:21 -07003407 # sendall accepts bytes-like objects
3408 if ctypes is not None:
3409 ubyte = ctypes.c_ubyte * len(data)
3410 byteslike = ubyte.from_buffer_copy(data)
3411 s.sendall(byteslike)
3412 self.assertEqual(s.read(), data)
3413
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003414 # Make sure sendmsg et al are disallowed to avoid
3415 # inadvertent disclosure of data and/or corruption
3416 # of the encrypted data stream
Miss Islington (bot)6485aa62018-12-06 12:52:43 -08003417 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003418 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3419 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3420 self.assertRaises(NotImplementedError,
Miss Islington (bot)6485aa62018-12-06 12:52:43 -08003421 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003422 s.write(b"over\n")
3423
3424 self.assertRaises(ValueError, s.recv, -1)
3425 self.assertRaises(ValueError, s.read, -1)
3426
3427 s.close()
3428
3429 def test_recv_zero(self):
3430 server = ThreadedEchoServer(CERTFILE)
3431 server.__enter__()
3432 self.addCleanup(server.__exit__, None, None)
3433 s = socket.create_connection((HOST, server.port))
3434 self.addCleanup(s.close)
3435 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3436 self.addCleanup(s.close)
3437
3438 # recv/read(0) should return no data
3439 s.send(b"data")
3440 self.assertEqual(s.recv(0), b"")
3441 self.assertEqual(s.read(0), b"")
3442 self.assertEqual(s.read(), b"data")
3443
3444 # Should not block if the other end sends no data
3445 s.setblocking(False)
3446 self.assertEqual(s.recv(0), b"")
3447 self.assertEqual(s.recv_into(bytearray()), 0)
3448
3449 def test_nonblocking_send(self):
3450 server = ThreadedEchoServer(CERTFILE,
3451 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003452 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003453 cacerts=CERTFILE,
3454 chatty=True,
3455 connectionchatty=False)
3456 with server:
3457 s = test_wrap_socket(socket.socket(),
3458 server_side=False,
3459 certfile=CERTFILE,
3460 ca_certs=CERTFILE,
3461 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003462 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003463 s.connect((HOST, server.port))
3464 s.setblocking(False)
3465
3466 # If we keep sending data, at some point the buffers
3467 # will be full and the call will block
3468 buf = bytearray(8192)
3469 def fill_buffer():
3470 while True:
3471 s.send(buf)
3472 self.assertRaises((ssl.SSLWantWriteError,
3473 ssl.SSLWantReadError), fill_buffer)
3474
3475 # Now read all the output and discard it
3476 s.setblocking(True)
3477 s.close()
3478
3479 def test_handshake_timeout(self):
3480 # Issue #5103: SSL handshake must respect the socket timeout
3481 server = socket.socket(socket.AF_INET)
3482 host = "127.0.0.1"
3483 port = support.bind_port(server)
3484 started = threading.Event()
3485 finish = False
3486
3487 def serve():
3488 server.listen()
3489 started.set()
3490 conns = []
3491 while not finish:
3492 r, w, e = select.select([server], [], [], 0.1)
3493 if server in r:
3494 # Let the socket hang around rather than having
3495 # it closed by garbage collection.
3496 conns.append(server.accept()[0])
3497 for sock in conns:
3498 sock.close()
3499
3500 t = threading.Thread(target=serve)
3501 t.start()
3502 started.wait()
3503
3504 try:
3505 try:
3506 c = socket.socket(socket.AF_INET)
3507 c.settimeout(0.2)
3508 c.connect((host, port))
3509 # Will attempt handshake and time out
3510 self.assertRaisesRegex(socket.timeout, "timed out",
3511 test_wrap_socket, c)
3512 finally:
3513 c.close()
3514 try:
3515 c = socket.socket(socket.AF_INET)
3516 c = test_wrap_socket(c)
3517 c.settimeout(0.2)
3518 # Will attempt handshake and time out
3519 self.assertRaisesRegex(socket.timeout, "timed out",
3520 c.connect, (host, port))
3521 finally:
3522 c.close()
3523 finally:
3524 finish = True
3525 t.join()
3526 server.close()
3527
3528 def test_server_accept(self):
3529 # Issue #16357: accept() on a SSLSocket created through
3530 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003531 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003532 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003533 context.load_verify_locations(SIGNING_CA)
3534 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003535 server = socket.socket(socket.AF_INET)
3536 host = "127.0.0.1"
3537 port = support.bind_port(server)
3538 server = context.wrap_socket(server, server_side=True)
3539 self.assertTrue(server.server_side)
3540
3541 evt = threading.Event()
3542 remote = None
3543 peer = None
3544 def serve():
3545 nonlocal remote, peer
3546 server.listen()
3547 # Block on the accept and wait on the connection to close.
3548 evt.set()
3549 remote, peer = server.accept()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003550 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003551
3552 t = threading.Thread(target=serve)
3553 t.start()
3554 # Client wait until server setup and perform a connect.
3555 evt.wait()
3556 client = context.wrap_socket(socket.socket())
3557 client.connect((host, port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003558 client.send(b'data')
3559 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003560 client_addr = client.getsockname()
3561 client.close()
3562 t.join()
3563 remote.close()
3564 server.close()
3565 # Sanity checks.
3566 self.assertIsInstance(remote, ssl.SSLSocket)
3567 self.assertEqual(peer, client_addr)
3568
3569 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003570 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003571 with context.wrap_socket(socket.socket()) as sock:
3572 with self.assertRaises(OSError) as cm:
3573 sock.getpeercert()
3574 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3575
3576 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003577 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003578 with context.wrap_socket(socket.socket()) as sock:
3579 with self.assertRaises(OSError) as cm:
3580 sock.do_handshake()
3581 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3582
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003583 def test_no_shared_ciphers(self):
3584 client_context, server_context, hostname = testing_context()
3585 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3586 client_context.options |= ssl.OP_NO_TLSv1_3
3587 # Force different suites on client and master
3588 client_context.set_ciphers("AES128")
3589 server_context.set_ciphers("AES256")
3590 with ThreadedEchoServer(context=server_context) as server:
3591 with client_context.wrap_socket(socket.socket(),
3592 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003593 with self.assertRaises(OSError):
3594 s.connect((HOST, server.port))
3595 self.assertIn("no shared cipher", server.conn_errors[0])
3596
3597 def test_version_basic(self):
3598 """
3599 Basic tests for SSLSocket.version().
3600 More tests are done in the test_protocol_*() methods.
3601 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003602 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3603 context.check_hostname = False
3604 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003605 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003606 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003607 chatty=False) as server:
3608 with context.wrap_socket(socket.socket()) as s:
3609 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003610 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003611 s.connect((HOST, server.port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003612 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003613 self.assertEqual(s.version(), 'TLSv1.3')
3614 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003615 self.assertEqual(s.version(), 'TLSv1.2')
3616 else: # 0.9.8 to 1.0.1
3617 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003618 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003619 self.assertIs(s.version(), None)
3620
Christian Heimescb5b68a2017-09-07 18:07:00 -07003621 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3622 "test requires TLSv1.3 enabled OpenSSL")
3623 def test_tls1_3(self):
3624 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3625 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003626 context.options |= (
3627 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3628 )
3629 with ThreadedEchoServer(context=context) as server:
3630 with context.wrap_socket(socket.socket()) as s:
3631 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003632 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003633 'TLS_AES_256_GCM_SHA384',
3634 'TLS_CHACHA20_POLY1305_SHA256',
3635 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003636 })
3637 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003638
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003639 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3640 "required OpenSSL 1.1.0g")
3641 def test_min_max_version(self):
3642 client_context, server_context, hostname = testing_context()
3643 # client TLSv1.0 to 1.2
3644 client_context.minimum_version = ssl.TLSVersion.TLSv1
3645 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3646 # server only TLSv1.2
3647 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3648 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3649
3650 with ThreadedEchoServer(context=server_context) as server:
3651 with client_context.wrap_socket(socket.socket(),
3652 server_hostname=hostname) as s:
3653 s.connect((HOST, server.port))
3654 self.assertEqual(s.version(), 'TLSv1.2')
3655
3656 # client 1.0 to 1.2, server 1.0 to 1.1
3657 server_context.minimum_version = ssl.TLSVersion.TLSv1
3658 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3659
3660 with ThreadedEchoServer(context=server_context) as server:
3661 with client_context.wrap_socket(socket.socket(),
3662 server_hostname=hostname) as s:
3663 s.connect((HOST, server.port))
3664 self.assertEqual(s.version(), 'TLSv1.1')
3665
3666 # client 1.0, server 1.2 (mismatch)
3667 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3668 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3669 client_context.minimum_version = ssl.TLSVersion.TLSv1
3670 client_context.maximum_version = ssl.TLSVersion.TLSv1
3671 with ThreadedEchoServer(context=server_context) as server:
3672 with client_context.wrap_socket(socket.socket(),
3673 server_hostname=hostname) as s:
3674 with self.assertRaises(ssl.SSLError) as e:
3675 s.connect((HOST, server.port))
3676 self.assertIn("alert", str(e.exception))
3677
3678
3679 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3680 "required OpenSSL 1.1.0g")
3681 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3682 def test_min_max_version_sslv3(self):
3683 client_context, server_context, hostname = testing_context()
3684 server_context.minimum_version = ssl.TLSVersion.SSLv3
3685 client_context.minimum_version = ssl.TLSVersion.SSLv3
3686 client_context.maximum_version = ssl.TLSVersion.SSLv3
3687 with ThreadedEchoServer(context=server_context) as server:
3688 with client_context.wrap_socket(socket.socket(),
3689 server_hostname=hostname) as s:
3690 s.connect((HOST, server.port))
3691 self.assertEqual(s.version(), 'SSLv3')
3692
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003693 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3694 def test_default_ecdh_curve(self):
3695 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3696 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003697 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003698 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003699 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3700 # cipher name.
3701 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003702 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3703 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3704 # our default cipher list should prefer ECDH-based ciphers
3705 # automatically.
3706 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3707 context.set_ciphers("ECCdraft:ECDH")
3708 with ThreadedEchoServer(context=context) as server:
3709 with context.wrap_socket(socket.socket()) as s:
3710 s.connect((HOST, server.port))
3711 self.assertIn("ECDH", s.cipher()[0])
3712
3713 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3714 "'tls-unique' channel binding not available")
3715 def test_tls_unique_channel_binding(self):
3716 """Test tls-unique channel binding."""
3717 if support.verbose:
3718 sys.stdout.write("\n")
3719
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003720 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003721
3722 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003723 chatty=True,
3724 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003725
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003726 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003727 with client_context.wrap_socket(
3728 socket.socket(),
3729 server_hostname=hostname) as s:
3730 s.connect((HOST, server.port))
3731 # get the data
3732 cb_data = s.get_channel_binding("tls-unique")
3733 if support.verbose:
3734 sys.stdout.write(
3735 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003736
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003737 # check if it is sane
3738 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003739 if s.version() == 'TLSv1.3':
3740 self.assertEqual(len(cb_data), 48)
3741 else:
3742 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003743
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003744 # and compare with the peers version
3745 s.write(b"CB tls-unique\n")
3746 peer_data_repr = s.read().strip()
3747 self.assertEqual(peer_data_repr,
3748 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003749
3750 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003751 with client_context.wrap_socket(
3752 socket.socket(),
3753 server_hostname=hostname) as s:
3754 s.connect((HOST, server.port))
3755 new_cb_data = s.get_channel_binding("tls-unique")
3756 if support.verbose:
3757 sys.stdout.write(
3758 "got another channel binding data: {0!r}\n".format(
3759 new_cb_data)
3760 )
3761 # is it really unique
3762 self.assertNotEqual(cb_data, new_cb_data)
3763 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003764 if s.version() == 'TLSv1.3':
3765 self.assertEqual(len(cb_data), 48)
3766 else:
3767 self.assertEqual(len(cb_data), 12) # True for TLSv1
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003768 s.write(b"CB tls-unique\n")
3769 peer_data_repr = s.read().strip()
3770 self.assertEqual(peer_data_repr,
3771 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003772
3773 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003774 client_context, server_context, hostname = testing_context()
3775 stats = server_params_test(client_context, server_context,
3776 chatty=True, connectionchatty=True,
3777 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003778 if support.verbose:
3779 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3780 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3781
3782 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3783 "ssl.OP_NO_COMPRESSION needed for this test")
3784 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003785 client_context, server_context, hostname = testing_context()
3786 client_context.options |= ssl.OP_NO_COMPRESSION
3787 server_context.options |= ssl.OP_NO_COMPRESSION
3788 stats = server_params_test(client_context, server_context,
3789 chatty=True, connectionchatty=True,
3790 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003791 self.assertIs(stats['compression'], None)
3792
3793 def test_dh_params(self):
3794 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003795 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003796 # test scenario needs TLS <= 1.2
3797 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003798 server_context.load_dh_params(DHFILE)
3799 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003800 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003801 stats = server_params_test(client_context, server_context,
3802 chatty=True, connectionchatty=True,
3803 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003804 cipher = stats["cipher"][0]
3805 parts = cipher.split("-")
3806 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3807 self.fail("Non-DH cipher: " + cipher[0])
3808
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003809 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003810 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003811 def test_ecdh_curve(self):
3812 # server secp384r1, client auto
3813 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003814
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003815 server_context.set_ecdh_curve("secp384r1")
3816 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3817 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3818 stats = server_params_test(client_context, server_context,
3819 chatty=True, connectionchatty=True,
3820 sni_name=hostname)
3821
3822 # server auto, client secp384r1
3823 client_context, server_context, hostname = testing_context()
3824 client_context.set_ecdh_curve("secp384r1")
3825 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3826 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3827 stats = server_params_test(client_context, server_context,
3828 chatty=True, connectionchatty=True,
3829 sni_name=hostname)
3830
3831 # server / client curve mismatch
3832 client_context, server_context, hostname = testing_context()
3833 client_context.set_ecdh_curve("prime256v1")
3834 server_context.set_ecdh_curve("secp384r1")
3835 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3836 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3837 try:
3838 stats = server_params_test(client_context, server_context,
3839 chatty=True, connectionchatty=True,
3840 sni_name=hostname)
3841 except ssl.SSLError:
3842 pass
3843 else:
3844 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003845 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003846 self.fail("mismatch curve did not fail")
3847
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003848 def test_selected_alpn_protocol(self):
3849 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003850 client_context, server_context, hostname = testing_context()
3851 stats = server_params_test(client_context, server_context,
3852 chatty=True, connectionchatty=True,
3853 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003854 self.assertIs(stats['client_alpn_protocol'], None)
3855
3856 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3857 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3858 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003859 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003860 server_context.set_alpn_protocols(['foo', 'bar'])
3861 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003862 chatty=True, connectionchatty=True,
3863 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003864 self.assertIs(stats['client_alpn_protocol'], None)
3865
3866 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3867 def test_alpn_protocols(self):
3868 server_protocols = ['foo', 'bar', 'milkshake']
3869 protocol_tests = [
3870 (['foo', 'bar'], 'foo'),
3871 (['bar', 'foo'], 'foo'),
3872 (['milkshake'], 'milkshake'),
3873 (['http/3.0', 'http/4.0'], None)
3874 ]
3875 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003876 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003877 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003878 client_context.set_alpn_protocols(client_protocols)
3879
3880 try:
3881 stats = server_params_test(client_context,
3882 server_context,
3883 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003884 connectionchatty=True,
3885 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003886 except ssl.SSLError as e:
3887 stats = e
3888
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003889 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003890 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3891 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3892 self.assertIsInstance(stats, ssl.SSLError)
3893 else:
3894 msg = "failed trying %s (s) and %s (c).\n" \
3895 "was expecting %s, but got %%s from the %%s" \
3896 % (str(server_protocols), str(client_protocols),
3897 str(expected))
3898 client_result = stats['client_alpn_protocol']
3899 self.assertEqual(client_result, expected,
3900 msg % (client_result, "client"))
3901 server_result = stats['server_alpn_protocols'][-1] \
3902 if len(stats['server_alpn_protocols']) else 'nothing'
3903 self.assertEqual(server_result, expected,
3904 msg % (server_result, "server"))
3905
3906 def test_selected_npn_protocol(self):
3907 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003908 client_context, server_context, hostname = testing_context()
3909 stats = server_params_test(client_context, server_context,
3910 chatty=True, connectionchatty=True,
3911 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003912 self.assertIs(stats['client_npn_protocol'], None)
3913
3914 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3915 def test_npn_protocols(self):
3916 server_protocols = ['http/1.1', 'spdy/2']
3917 protocol_tests = [
3918 (['http/1.1', 'spdy/2'], 'http/1.1'),
3919 (['spdy/2', 'http/1.1'], 'http/1.1'),
3920 (['spdy/2', 'test'], 'spdy/2'),
3921 (['abc', 'def'], 'abc')
3922 ]
3923 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003924 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003925 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003926 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003927 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003928 chatty=True, connectionchatty=True,
3929 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003930 msg = "failed trying %s (s) and %s (c).\n" \
3931 "was expecting %s, but got %%s from the %%s" \
3932 % (str(server_protocols), str(client_protocols),
3933 str(expected))
3934 client_result = stats['client_npn_protocol']
3935 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3936 server_result = stats['server_npn_protocols'][-1] \
3937 if len(stats['server_npn_protocols']) else 'nothing'
3938 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3939
3940 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003941 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003942 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003943 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003944 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003945 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003946 client_context.load_verify_locations(SIGNING_CA)
3947 return server_context, other_context, client_context
3948
3949 def check_common_name(self, stats, name):
3950 cert = stats['peercert']
3951 self.assertIn((('commonName', name),), cert['subject'])
3952
3953 @needs_sni
3954 def test_sni_callback(self):
3955 calls = []
3956 server_context, other_context, client_context = self.sni_contexts()
3957
Christian Heimesa170fa12017-09-15 20:27:30 +02003958 client_context.check_hostname = False
3959
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960 def servername_cb(ssl_sock, server_name, initial_context):
3961 calls.append((server_name, initial_context))
3962 if server_name is not None:
3963 ssl_sock.context = other_context
3964 server_context.set_servername_callback(servername_cb)
3965
3966 stats = server_params_test(client_context, server_context,
3967 chatty=True,
3968 sni_name='supermessage')
3969 # The hostname was fetched properly, and the certificate was
3970 # changed for the connection.
3971 self.assertEqual(calls, [("supermessage", server_context)])
3972 # CERTFILE4 was selected
3973 self.check_common_name(stats, 'fakehostname')
3974
3975 calls = []
3976 # The callback is called with server_name=None
3977 stats = server_params_test(client_context, server_context,
3978 chatty=True,
3979 sni_name=None)
3980 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003981 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003982
3983 # Check disabling the callback
3984 calls = []
3985 server_context.set_servername_callback(None)
3986
3987 stats = server_params_test(client_context, server_context,
3988 chatty=True,
3989 sni_name='notfunny')
3990 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003991 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003992 self.assertEqual(calls, [])
3993
3994 @needs_sni
3995 def test_sni_callback_alert(self):
3996 # Returning a TLS alert is reflected to the connecting client
3997 server_context, other_context, client_context = self.sni_contexts()
3998
3999 def cb_returning_alert(ssl_sock, server_name, initial_context):
4000 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4001 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004002 with self.assertRaises(ssl.SSLError) as cm:
4003 stats = server_params_test(client_context, server_context,
4004 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004005 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004006 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004007
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004008 @needs_sni
4009 def test_sni_callback_raising(self):
4010 # Raising fails the connection with a TLS handshake failure alert.
4011 server_context, other_context, client_context = self.sni_contexts()
4012
4013 def cb_raising(ssl_sock, server_name, initial_context):
4014 1/0
4015 server_context.set_servername_callback(cb_raising)
4016
4017 with self.assertRaises(ssl.SSLError) as cm, \
4018 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004019 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004020 chatty=False,
4021 sni_name='supermessage')
4022 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4023 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004024
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004025 @needs_sni
4026 def test_sni_callback_wrong_return_type(self):
4027 # Returning the wrong return type terminates the TLS connection
4028 # with an internal error alert.
4029 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004030
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004031 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4032 return "foo"
4033 server_context.set_servername_callback(cb_wrong_return_type)
4034
4035 with self.assertRaises(ssl.SSLError) as cm, \
4036 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004037 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004038 chatty=False,
4039 sni_name='supermessage')
4040 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4041 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004042
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004043 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004044 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004045 client_context.set_ciphers("AES128:AES256")
4046 server_context.set_ciphers("AES256")
4047 expected_algs = [
4048 "AES256", "AES-256",
4049 # TLS 1.3 ciphers are always enabled
4050 "TLS_CHACHA20", "TLS_AES",
4051 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004052
Christian Heimesa170fa12017-09-15 20:27:30 +02004053 stats = server_params_test(client_context, server_context,
4054 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004055 ciphers = stats['server_shared_ciphers'][0]
4056 self.assertGreater(len(ciphers), 0)
4057 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004058 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004059 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004060
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004061 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004062 client_context, server_context, hostname = testing_context()
4063 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004064
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004065 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004066 s = client_context.wrap_socket(socket.socket(),
4067 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004068 s.connect((HOST, server.port))
4069 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004070
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 self.assertRaises(ValueError, s.read, 1024)
4072 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004073
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004074 def test_sendfile(self):
4075 TEST_DATA = b"x" * 512
4076 with open(support.TESTFN, 'wb') as f:
4077 f.write(TEST_DATA)
4078 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004079 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004080 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004081 context.load_verify_locations(SIGNING_CA)
4082 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004083 server = ThreadedEchoServer(context=context, chatty=False)
4084 with server:
4085 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004086 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004087 with open(support.TESTFN, 'rb') as file:
4088 s.sendfile(file)
4089 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004090
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004091 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004092 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004093 # TODO: sessions aren't compatible with TLSv1.3 yet
4094 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004097 stats = server_params_test(client_context, server_context,
4098 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004099 session = stats['session']
4100 self.assertTrue(session.id)
4101 self.assertGreater(session.time, 0)
4102 self.assertGreater(session.timeout, 0)
4103 self.assertTrue(session.has_ticket)
4104 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4105 self.assertGreater(session.ticket_lifetime_hint, 0)
4106 self.assertFalse(stats['session_reused'])
4107 sess_stat = server_context.session_stats()
4108 self.assertEqual(sess_stat['accept'], 1)
4109 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004110
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004111 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004112 stats = server_params_test(client_context, server_context,
4113 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004114 sess_stat = server_context.session_stats()
4115 self.assertEqual(sess_stat['accept'], 2)
4116 self.assertEqual(sess_stat['hits'], 1)
4117 self.assertTrue(stats['session_reused'])
4118 session2 = stats['session']
4119 self.assertEqual(session2.id, session.id)
4120 self.assertEqual(session2, session)
4121 self.assertIsNot(session2, session)
4122 self.assertGreaterEqual(session2.time, session.time)
4123 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004124
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004125 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004126 stats = server_params_test(client_context, server_context,
4127 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004128 self.assertFalse(stats['session_reused'])
4129 session3 = stats['session']
4130 self.assertNotEqual(session3.id, session.id)
4131 self.assertNotEqual(session3, session)
4132 sess_stat = server_context.session_stats()
4133 self.assertEqual(sess_stat['accept'], 3)
4134 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004135
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004136 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004137 stats = server_params_test(client_context, server_context,
4138 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004139 self.assertTrue(stats['session_reused'])
4140 session4 = stats['session']
4141 self.assertEqual(session4.id, session.id)
4142 self.assertEqual(session4, session)
4143 self.assertGreaterEqual(session4.time, session.time)
4144 self.assertGreaterEqual(session4.timeout, session.timeout)
4145 sess_stat = server_context.session_stats()
4146 self.assertEqual(sess_stat['accept'], 4)
4147 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004148
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004149 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004150 client_context, server_context, hostname = testing_context()
4151 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004152
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004153 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004154 client_context.options |= ssl.OP_NO_TLSv1_3
4155 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004156
Christian Heimesa170fa12017-09-15 20:27:30 +02004157 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004158 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004159 with client_context.wrap_socket(socket.socket(),
4160 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004161 # session is None before handshake
4162 self.assertEqual(s.session, None)
4163 self.assertEqual(s.session_reused, None)
4164 s.connect((HOST, server.port))
4165 session = s.session
4166 self.assertTrue(session)
4167 with self.assertRaises(TypeError) as e:
4168 s.session = object
Miss Islington (bot)42198572018-06-11 17:58:06 -07004169 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004170
Christian Heimesa170fa12017-09-15 20:27:30 +02004171 with client_context.wrap_socket(socket.socket(),
4172 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004173 s.connect((HOST, server.port))
4174 # cannot set session after handshake
4175 with self.assertRaises(ValueError) as e:
4176 s.session = session
4177 self.assertEqual(str(e.exception),
4178 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004179
Christian Heimesa170fa12017-09-15 20:27:30 +02004180 with client_context.wrap_socket(socket.socket(),
4181 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004182 # can set session before handshake and before the
4183 # connection was established
4184 s.session = session
4185 s.connect((HOST, server.port))
4186 self.assertEqual(s.session.id, session.id)
4187 self.assertEqual(s.session, session)
4188 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004189
Christian Heimesa170fa12017-09-15 20:27:30 +02004190 with client_context2.wrap_socket(socket.socket(),
4191 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004192 # cannot re-use session with a different SSLContext
4193 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004194 s.session = session
4195 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004196 self.assertEqual(str(e.exception),
4197 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004198
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004199
Christian Heimes2756ef32018-09-23 09:22:52 +02004200@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4201class TestPostHandshakeAuth(unittest.TestCase):
4202 def test_pha_setter(self):
4203 protocols = [
4204 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4205 ]
4206 for protocol in protocols:
4207 ctx = ssl.SSLContext(protocol)
4208 self.assertEqual(ctx.post_handshake_auth, False)
4209
4210 ctx.post_handshake_auth = True
4211 self.assertEqual(ctx.post_handshake_auth, True)
4212
4213 ctx.verify_mode = ssl.CERT_REQUIRED
4214 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4215 self.assertEqual(ctx.post_handshake_auth, True)
4216
4217 ctx.post_handshake_auth = False
4218 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4219 self.assertEqual(ctx.post_handshake_auth, False)
4220
4221 ctx.verify_mode = ssl.CERT_OPTIONAL
4222 ctx.post_handshake_auth = True
4223 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4224 self.assertEqual(ctx.post_handshake_auth, True)
4225
4226 def test_pha_required(self):
4227 client_context, server_context, hostname = testing_context()
4228 server_context.post_handshake_auth = True
4229 server_context.verify_mode = ssl.CERT_REQUIRED
4230 client_context.post_handshake_auth = True
4231 client_context.load_cert_chain(SIGNED_CERTFILE)
4232
4233 server = ThreadedEchoServer(context=server_context, chatty=False)
4234 with server:
4235 with client_context.wrap_socket(socket.socket(),
4236 server_hostname=hostname) as s:
4237 s.connect((HOST, server.port))
4238 s.write(b'HASCERT')
4239 self.assertEqual(s.recv(1024), b'FALSE\n')
4240 s.write(b'PHA')
4241 self.assertEqual(s.recv(1024), b'OK\n')
4242 s.write(b'HASCERT')
4243 self.assertEqual(s.recv(1024), b'TRUE\n')
4244 # PHA method just returns true when cert is already available
4245 s.write(b'PHA')
4246 self.assertEqual(s.recv(1024), b'OK\n')
4247 s.write(b'GETCERT')
4248 cert_text = s.recv(4096).decode('us-ascii')
4249 self.assertIn('Python Software Foundation CA', cert_text)
4250
4251 def test_pha_required_nocert(self):
4252 client_context, server_context, hostname = testing_context()
4253 server_context.post_handshake_auth = True
4254 server_context.verify_mode = ssl.CERT_REQUIRED
4255 client_context.post_handshake_auth = True
4256
4257 server = ThreadedEchoServer(context=server_context, chatty=False)
4258 with server:
4259 with client_context.wrap_socket(socket.socket(),
4260 server_hostname=hostname) as s:
4261 s.connect((HOST, server.port))
4262 s.write(b'PHA')
4263 # receive CertificateRequest
4264 self.assertEqual(s.recv(1024), b'OK\n')
4265 # send empty Certificate + Finish
4266 s.write(b'HASCERT')
4267 # receive alert
4268 with self.assertRaisesRegex(
4269 ssl.SSLError,
4270 'tlsv13 alert certificate required'):
4271 s.recv(1024)
4272
4273 def test_pha_optional(self):
4274 if support.verbose:
4275 sys.stdout.write("\n")
4276
4277 client_context, server_context, hostname = testing_context()
4278 server_context.post_handshake_auth = True
4279 server_context.verify_mode = ssl.CERT_REQUIRED
4280 client_context.post_handshake_auth = True
4281 client_context.load_cert_chain(SIGNED_CERTFILE)
4282
4283 # check CERT_OPTIONAL
4284 server_context.verify_mode = ssl.CERT_OPTIONAL
4285 server = ThreadedEchoServer(context=server_context, chatty=False)
4286 with server:
4287 with client_context.wrap_socket(socket.socket(),
4288 server_hostname=hostname) as s:
4289 s.connect((HOST, server.port))
4290 s.write(b'HASCERT')
4291 self.assertEqual(s.recv(1024), b'FALSE\n')
4292 s.write(b'PHA')
4293 self.assertEqual(s.recv(1024), b'OK\n')
4294 s.write(b'HASCERT')
4295 self.assertEqual(s.recv(1024), b'TRUE\n')
4296
4297 def test_pha_optional_nocert(self):
4298 if support.verbose:
4299 sys.stdout.write("\n")
4300
4301 client_context, server_context, hostname = testing_context()
4302 server_context.post_handshake_auth = True
4303 server_context.verify_mode = ssl.CERT_OPTIONAL
4304 client_context.post_handshake_auth = True
4305
4306 server = ThreadedEchoServer(context=server_context, chatty=False)
4307 with server:
4308 with client_context.wrap_socket(socket.socket(),
4309 server_hostname=hostname) as s:
4310 s.connect((HOST, server.port))
4311 s.write(b'HASCERT')
4312 self.assertEqual(s.recv(1024), b'FALSE\n')
4313 s.write(b'PHA')
4314 self.assertEqual(s.recv(1024), b'OK\n')
4315 # optional doens't fail when client does not have a cert
4316 s.write(b'HASCERT')
4317 self.assertEqual(s.recv(1024), b'FALSE\n')
4318
4319 def test_pha_no_pha_client(self):
4320 client_context, server_context, hostname = testing_context()
4321 server_context.post_handshake_auth = True
4322 server_context.verify_mode = ssl.CERT_REQUIRED
4323 client_context.load_cert_chain(SIGNED_CERTFILE)
4324
4325 server = ThreadedEchoServer(context=server_context, chatty=False)
4326 with server:
4327 with client_context.wrap_socket(socket.socket(),
4328 server_hostname=hostname) as s:
4329 s.connect((HOST, server.port))
4330 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4331 s.verify_client_post_handshake()
4332 s.write(b'PHA')
4333 self.assertIn(b'extension not received', s.recv(1024))
4334
4335 def test_pha_no_pha_server(self):
4336 # server doesn't have PHA enabled, cert is requested in handshake
4337 client_context, server_context, hostname = testing_context()
4338 server_context.verify_mode = ssl.CERT_REQUIRED
4339 client_context.post_handshake_auth = True
4340 client_context.load_cert_chain(SIGNED_CERTFILE)
4341
4342 server = ThreadedEchoServer(context=server_context, chatty=False)
4343 with server:
4344 with client_context.wrap_socket(socket.socket(),
4345 server_hostname=hostname) as s:
4346 s.connect((HOST, server.port))
4347 s.write(b'HASCERT')
4348 self.assertEqual(s.recv(1024), b'TRUE\n')
4349 # PHA doesn't fail if there is already a cert
4350 s.write(b'PHA')
4351 self.assertEqual(s.recv(1024), b'OK\n')
4352 s.write(b'HASCERT')
4353 self.assertEqual(s.recv(1024), b'TRUE\n')
4354
4355 def test_pha_not_tls13(self):
4356 # TLS 1.2
4357 client_context, server_context, hostname = testing_context()
4358 server_context.verify_mode = ssl.CERT_REQUIRED
4359 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4360 client_context.post_handshake_auth = True
4361 client_context.load_cert_chain(SIGNED_CERTFILE)
4362
4363 server = ThreadedEchoServer(context=server_context, chatty=False)
4364 with server:
4365 with client_context.wrap_socket(socket.socket(),
4366 server_hostname=hostname) as s:
4367 s.connect((HOST, server.port))
4368 # PHA fails for TLS != 1.3
4369 s.write(b'PHA')
4370 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4371
4372
Thomas Woutersed03b412007-08-28 21:37:11 +00004373def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004374 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004375 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004376 plats = {
4377 'Linux': platform.linux_distribution,
4378 'Mac': platform.mac_ver,
4379 'Windows': platform.win32_ver,
4380 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004381 with warnings.catch_warnings():
4382 warnings.filterwarnings(
4383 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004384 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004385 'functions are deprecated .*',
Xtreakf5770f32018-07-05 22:29:46 +05304386 DeprecationWarning,
Berker Peksag9e7990a2015-05-16 23:21:26 +03004387 )
4388 for name, func in plats.items():
4389 plat = func()
4390 if plat and plat[0]:
4391 plat = '%s %r' % (name, plat)
4392 break
4393 else:
4394 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004395 print("test_ssl: testing with %r %r" %
4396 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4397 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004398 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004399 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4400 try:
4401 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4402 except AttributeError:
4403 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004404
Antoine Pitrou152efa22010-05-16 18:19:27 +00004405 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004406 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004407 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004408 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004409 BADCERT, BADKEY, EMPTYCERT]:
4410 if not os.path.exists(filename):
4411 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004412
Martin Panter3840b2a2016-03-27 01:53:46 +00004413 tests = [
4414 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004415 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes2756ef32018-09-23 09:22:52 +02004416 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004417 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004418
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004419 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004420 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004421
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004422 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004423 try:
4424 support.run_unittest(*tests)
4425 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004426 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004427
4428if __name__ == "__main__":
4429 test_main()