blob: d978a9e1b0feabae138f14101cfa33930c06014a [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080058WRONG_CERT = data_file("wrongcert.pem")
Christian Heimesefff7062013-11-21 03:35:02 +010059
Christian Heimesbd5c7d22018-01-20 15:16:30 +010060CERTFILE_INFO = {
61 'issuer': ((('countryName', 'XY'),),
62 (('localityName', 'Castle Anthrax'),),
63 (('organizationName', 'Python Software Foundation'),),
64 (('commonName', 'localhost'),)),
65 'notAfter': 'Jan 17 19:09:06 2028 GMT',
66 'notBefore': 'Jan 19 19:09:06 2018 GMT',
67 'serialNumber': 'F9BA076D5B6ABD9B',
68 'subject': ((('countryName', 'XY'),),
69 (('localityName', 'Castle Anthrax'),),
70 (('organizationName', 'Python Software Foundation'),),
71 (('commonName', 'localhost'),)),
72 'subjectAltName': (('DNS', 'localhost'),),
73 'version': 3
74}
Antoine Pitrou152efa22010-05-16 18:19:27 +000075
Christian Heimes22587792013-11-21 23:56:13 +010076# empty CRL
77CRLFILE = data_file("revocation.crl")
78
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010079# Two keys and certs signed by the same CA (for SNI tests)
80SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020081SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082
83SIGNED_CERTFILE_INFO = {
84 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
85 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
86 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
87 'issuer': ((('countryName', 'XY'),),
88 (('organizationName', 'Python Software Foundation CA'),),
89 (('commonName', 'our-ca-server'),)),
90 'notAfter': 'Nov 28 19:09:06 2027 GMT',
91 'notBefore': 'Jan 19 19:09:06 2018 GMT',
92 'serialNumber': '82EDBF41C880919C',
93 'subject': ((('countryName', 'XY'),),
94 (('localityName', 'Castle Anthrax'),),
95 (('organizationName', 'Python Software Foundation'),),
96 (('commonName', 'localhost'),)),
97 'subjectAltName': (('DNS', 'localhost'),),
98 'version': 3
99}
100
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100101SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200102SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100103SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
104SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
105
Martin Panter3840b2a2016-03-27 01:53:46 +0000106# Same certificate as pycacert.pem, but without extra text in file
107SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200108# cert with all kinds of subject alt names
109ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100110IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100111
Martin Panter3d81d932016-01-14 09:36:00 +0000112REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000113
114EMPTYCERT = data_file("nullcert.pem")
115BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000116NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000117BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200118NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200119NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400121DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
185def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
187 @functools.wraps(func)
188 def f(*args, **kwargs):
189 try:
190 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
191 except ssl.SSLError:
192 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
193 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
194 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
195 return func(*args, **kwargs)
196 return f
197 else:
198 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100200needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
201
Antoine Pitrou23df4832010-08-04 17:14:06 +0000202
Christian Heimesd0486372016-09-10 23:23:33 +0200203def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
204 cert_reqs=ssl.CERT_NONE, ca_certs=None,
205 ciphers=None, certfile=None, keyfile=None,
206 **kwargs):
207 context = ssl.SSLContext(ssl_version)
208 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200209 if cert_reqs == ssl.CERT_NONE:
210 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200211 context.verify_mode = cert_reqs
212 if ca_certs is not None:
213 context.load_verify_locations(ca_certs)
214 if certfile is not None or keyfile is not None:
215 context.load_cert_chain(certfile, keyfile)
216 if ciphers is not None:
217 context.set_ciphers(ciphers)
218 return context.wrap_socket(sock, **kwargs)
219
Christian Heimesa170fa12017-09-15 20:27:30 +0200220
221def testing_context(server_cert=SIGNED_CERTFILE):
222 """Create context
223
224 client_context, server_context, hostname = testing_context()
225 """
226 if server_cert == SIGNED_CERTFILE:
227 hostname = SIGNED_CERTFILE_HOSTNAME
228 elif server_cert == SIGNED_CERTFILE2:
229 hostname = SIGNED_CERTFILE2_HOSTNAME
230 else:
231 raise ValueError(server_cert)
232
233 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
234 client_context.load_verify_locations(SIGNING_CA)
235
236 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
237 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800238 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200239
240 return client_context, server_context, hostname
241
242
Antoine Pitrou152efa22010-05-16 18:19:27 +0000243class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000244
Antoine Pitrou480a1242010-04-28 21:37:09 +0000245 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000246 ssl.CERT_NONE
247 ssl.CERT_OPTIONAL
248 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100249 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100250 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100251 if ssl.HAS_ECDH:
252 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100253 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
254 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000255 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100256 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700257 ssl.OP_NO_SSLv2
258 ssl.OP_NO_SSLv3
259 ssl.OP_NO_TLSv1
260 ssl.OP_NO_TLSv1_3
261 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
262 ssl.OP_NO_TLSv1_1
263 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200264 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000265
Antoine Pitrou172f0252014-04-18 20:33:08 +0200266 def test_str_for_enums(self):
267 # Make sure that the PROTOCOL_* constants have enum-like string
268 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200269 proto = ssl.PROTOCOL_TLS
270 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200271 ctx = ssl.SSLContext(proto)
272 self.assertIs(ctx.protocol, proto)
273
Antoine Pitrou480a1242010-04-28 21:37:09 +0000274 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000275 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000276 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000277 sys.stdout.write("\n RAND_status is %d (%s)\n"
278 % (v, (v and "sufficient randomness") or
279 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200280
281 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
282 self.assertEqual(len(data), 16)
283 self.assertEqual(is_cryptographic, v == 1)
284 if v:
285 data = ssl.RAND_bytes(16)
286 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200287 else:
288 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200289
Victor Stinner1e81a392013-12-19 16:47:04 +0100290 # negative num is invalid
291 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
292 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
293
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100294 if hasattr(ssl, 'RAND_egd'):
295 self.assertRaises(TypeError, ssl.RAND_egd, 1)
296 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000297 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200298 ssl.RAND_add(b"this is a random bytes object", 75.0)
299 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000300
Christian Heimesf77b4b22013-08-21 13:26:05 +0200301 @unittest.skipUnless(os.name == 'posix', 'requires posix')
302 def test_random_fork(self):
303 status = ssl.RAND_status()
304 if not status:
305 self.fail("OpenSSL's PRNG has insufficient randomness")
306
307 rfd, wfd = os.pipe()
308 pid = os.fork()
309 if pid == 0:
310 try:
311 os.close(rfd)
312 child_random = ssl.RAND_pseudo_bytes(16)[0]
313 self.assertEqual(len(child_random), 16)
314 os.write(wfd, child_random)
315 os.close(wfd)
316 except BaseException:
317 os._exit(1)
318 else:
319 os._exit(0)
320 else:
321 os.close(wfd)
322 self.addCleanup(os.close, rfd)
323 _, status = os.waitpid(pid, 0)
324 self.assertEqual(status, 0)
325
326 child_random = os.read(rfd, 16)
327 self.assertEqual(len(child_random), 16)
328 parent_random = ssl.RAND_pseudo_bytes(16)[0]
329 self.assertEqual(len(parent_random), 16)
330
331 self.assertNotEqual(child_random, parent_random)
332
Antoine Pitrou480a1242010-04-28 21:37:09 +0000333 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000334 # note that this uses an 'unofficial' function in _ssl.c,
335 # provided solely for this test, to exercise the certificate
336 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100337 self.assertEqual(
338 ssl._ssl._test_decode_cert(CERTFILE),
339 CERTFILE_INFO
340 )
341 self.assertEqual(
342 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
343 SIGNED_CERTFILE_INFO
344 )
345
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200346 # Issue #13034: the subjectAltName in some certificates
347 # (notably projects.developer.nokia.com:443) wasn't parsed
348 p = ssl._ssl._test_decode_cert(NOKIACERT)
349 if support.verbose:
350 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
351 self.assertEqual(p['subjectAltName'],
352 (('DNS', 'projects.developer.nokia.com'),
353 ('DNS', 'projects.forum.nokia.com'))
354 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100355 # extra OCSP and AIA fields
356 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
357 self.assertEqual(p['caIssuers'],
358 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
359 self.assertEqual(p['crlDistributionPoints'],
360 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000361
Christian Heimes824f7f32013-08-17 00:54:47 +0200362 def test_parse_cert_CVE_2013_4238(self):
363 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
364 if support.verbose:
365 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
366 subject = ((('countryName', 'US'),),
367 (('stateOrProvinceName', 'Oregon'),),
368 (('localityName', 'Beaverton'),),
369 (('organizationName', 'Python Software Foundation'),),
370 (('organizationalUnitName', 'Python Core Development'),),
371 (('commonName', 'null.python.org\x00example.org'),),
372 (('emailAddress', 'python-dev@python.org'),))
373 self.assertEqual(p['subject'], subject)
374 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200375 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
376 san = (('DNS', 'altnull.python.org\x00example.com'),
377 ('email', 'null@python.org\x00user@example.org'),
378 ('URI', 'http://null.python.org\x00http://example.org'),
379 ('IP Address', '192.0.2.1'),
380 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
381 else:
382 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
383 san = (('DNS', 'altnull.python.org\x00example.com'),
384 ('email', 'null@python.org\x00user@example.org'),
385 ('URI', 'http://null.python.org\x00http://example.org'),
386 ('IP Address', '192.0.2.1'),
387 ('IP Address', '<invalid>'))
388
389 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200390
Christian Heimes1c03abd2016-09-06 23:25:35 +0200391 def test_parse_all_sans(self):
392 p = ssl._ssl._test_decode_cert(ALLSANFILE)
393 self.assertEqual(p['subjectAltName'],
394 (
395 ('DNS', 'allsans'),
396 ('othername', '<unsupported>'),
397 ('othername', '<unsupported>'),
398 ('email', 'user@example.org'),
399 ('DNS', 'www.example.org'),
400 ('DirName',
401 ((('countryName', 'XY'),),
402 (('localityName', 'Castle Anthrax'),),
403 (('organizationName', 'Python Software Foundation'),),
404 (('commonName', 'dirname example'),))),
405 ('URI', 'https://www.python.org/'),
406 ('IP Address', '127.0.0.1'),
407 ('IP Address', '0:0:0:0:0:0:0:1\n'),
408 ('Registered ID', '1.2.3.4.5')
409 )
410 )
411
Antoine Pitrou480a1242010-04-28 21:37:09 +0000412 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000413 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000414 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000415 d1 = ssl.PEM_cert_to_DER_cert(pem)
416 p2 = ssl.DER_cert_to_PEM_cert(d1)
417 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000418 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000419 if not p2.startswith(ssl.PEM_HEADER + '\n'):
420 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
421 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
422 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000423
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000424 def test_openssl_version(self):
425 n = ssl.OPENSSL_VERSION_NUMBER
426 t = ssl.OPENSSL_VERSION_INFO
427 s = ssl.OPENSSL_VERSION
428 self.assertIsInstance(n, int)
429 self.assertIsInstance(t, tuple)
430 self.assertIsInstance(s, str)
431 # Some sanity checks follow
432 # >= 0.9
433 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400434 # < 3.0
435 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000436 major, minor, fix, patch, status = t
437 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400438 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000439 self.assertGreaterEqual(minor, 0)
440 self.assertLess(minor, 256)
441 self.assertGreaterEqual(fix, 0)
442 self.assertLess(fix, 256)
443 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100444 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000445 self.assertGreaterEqual(status, 0)
446 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400447 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200448 if IS_LIBRESSL:
449 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100450 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400451 else:
452 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100453 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000454
Antoine Pitrou9d543662010-04-23 23:10:32 +0000455 @support.cpython_only
456 def test_refcycle(self):
457 # Issue #7943: an SSL object doesn't create reference cycles with
458 # itself.
459 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200460 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000461 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100462 with support.check_warnings(("", ResourceWarning)):
463 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100464 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000465
Antoine Pitroua468adc2010-09-14 14:43:44 +0000466 def test_wrapped_unconnected(self):
467 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200468 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000469 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200470 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100471 self.assertRaises(OSError, ss.recv, 1)
472 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
473 self.assertRaises(OSError, ss.recvfrom, 1)
474 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
475 self.assertRaises(OSError, ss.send, b'x')
476 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800477 self.assertRaises(NotImplementedError, ss.sendmsg,
478 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000479
Antoine Pitrou40f08742010-04-24 22:04:40 +0000480 def test_timeout(self):
481 # Issue #8524: when creating an SSL socket, the timeout of the
482 # original socket should be retained.
483 for timeout in (None, 0.0, 5.0):
484 s = socket.socket(socket.AF_INET)
485 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200486 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100487 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000488
Christian Heimesd0486372016-09-10 23:23:33 +0200489 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000490 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000491 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000492 "certfile must be specified",
493 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000494 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000495 "certfile must be specified for server-side operations",
496 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000497 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000498 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200499 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100500 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
501 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200502 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200503 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000504 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000505 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000506 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200507 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000508 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000509 ssl.wrap_socket(sock,
510 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000511 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200512 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000513 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000514 ssl.wrap_socket(sock,
515 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000516 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000517
Martin Panter3464ea22016-02-01 21:58:11 +0000518 def bad_cert_test(self, certfile):
519 """Check that trying to use the given client certificate fails"""
520 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
521 certfile)
522 sock = socket.socket()
523 self.addCleanup(sock.close)
524 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200525 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200526 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000527
528 def test_empty_cert(self):
529 """Wrapping with an empty cert file"""
530 self.bad_cert_test("nullcert.pem")
531
532 def test_malformed_cert(self):
533 """Wrapping with a badly formatted certificate (syntax error)"""
534 self.bad_cert_test("badcert.pem")
535
536 def test_malformed_key(self):
537 """Wrapping with a badly formatted key (syntax error)"""
538 self.bad_cert_test("badkey.pem")
539
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000540 def test_match_hostname(self):
541 def ok(cert, hostname):
542 ssl.match_hostname(cert, hostname)
543 def fail(cert, hostname):
544 self.assertRaises(ssl.CertificateError,
545 ssl.match_hostname, cert, hostname)
546
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100547 # -- Hostname matching --
548
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000549 cert = {'subject': ((('commonName', 'example.com'),),)}
550 ok(cert, 'example.com')
551 ok(cert, 'ExAmple.cOm')
552 fail(cert, 'www.example.com')
553 fail(cert, '.example.com')
554 fail(cert, 'example.org')
555 fail(cert, 'exampleXcom')
556
557 cert = {'subject': ((('commonName', '*.a.com'),),)}
558 ok(cert, 'foo.a.com')
559 fail(cert, 'bar.foo.a.com')
560 fail(cert, 'a.com')
561 fail(cert, 'Xa.com')
562 fail(cert, '.a.com')
563
Mandeep Singhede2ac92017-11-27 04:01:27 +0530564 # only match wildcards when they are the only thing
565 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000566 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530567 fail(cert, 'foo.com')
568 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000569 fail(cert, 'bar.com')
570 fail(cert, 'foo.a.com')
571 fail(cert, 'bar.foo.com')
572
Christian Heimes824f7f32013-08-17 00:54:47 +0200573 # NULL bytes are bad, CVE-2013-4073
574 cert = {'subject': ((('commonName',
575 'null.python.org\x00example.org'),),)}
576 ok(cert, 'null.python.org\x00example.org') # or raise an error?
577 fail(cert, 'example.org')
578 fail(cert, 'null.python.org')
579
Georg Brandl72c98d32013-10-27 07:16:53 +0100580 # error cases with wildcards
581 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
582 fail(cert, 'bar.foo.a.com')
583 fail(cert, 'a.com')
584 fail(cert, 'Xa.com')
585 fail(cert, '.a.com')
586
587 cert = {'subject': ((('commonName', 'a.*.com'),),)}
588 fail(cert, 'a.foo.com')
589 fail(cert, 'a..com')
590 fail(cert, 'a.com')
591
592 # wildcard doesn't match IDNA prefix 'xn--'
593 idna = 'püthon.python.org'.encode("idna").decode("ascii")
594 cert = {'subject': ((('commonName', idna),),)}
595 ok(cert, idna)
596 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
597 fail(cert, idna)
598 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
599 fail(cert, idna)
600
601 # wildcard in first fragment and IDNA A-labels in sequent fragments
602 # are supported.
603 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
604 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530605 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
606 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100607 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
608 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
609
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000610 # Slightly fake real-world example
611 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
612 'subject': ((('commonName', 'linuxfrz.org'),),),
613 'subjectAltName': (('DNS', 'linuxfr.org'),
614 ('DNS', 'linuxfr.com'),
615 ('othername', '<unsupported>'))}
616 ok(cert, 'linuxfr.org')
617 ok(cert, 'linuxfr.com')
618 # Not a "DNS" entry
619 fail(cert, '<unsupported>')
620 # When there is a subjectAltName, commonName isn't used
621 fail(cert, 'linuxfrz.org')
622
623 # A pristine real-world example
624 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
625 'subject': ((('countryName', 'US'),),
626 (('stateOrProvinceName', 'California'),),
627 (('localityName', 'Mountain View'),),
628 (('organizationName', 'Google Inc'),),
629 (('commonName', 'mail.google.com'),))}
630 ok(cert, 'mail.google.com')
631 fail(cert, 'gmail.com')
632 # Only commonName is considered
633 fail(cert, 'California')
634
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100635 # -- IPv4 matching --
636 cert = {'subject': ((('commonName', 'example.com'),),),
637 'subjectAltName': (('DNS', 'example.com'),
638 ('IP Address', '10.11.12.13'),
639 ('IP Address', '14.15.16.17'))}
640 ok(cert, '10.11.12.13')
641 ok(cert, '14.15.16.17')
642 fail(cert, '14.15.16.18')
643 fail(cert, 'example.net')
644
645 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800646 if hasattr(socket, 'AF_INET6'):
647 cert = {'subject': ((('commonName', 'example.com'),),),
648 'subjectAltName': (
649 ('DNS', 'example.com'),
650 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
651 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
652 ok(cert, '2001::cafe')
653 ok(cert, '2003::baba')
654 fail(cert, '2003::bebe')
655 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100656
657 # -- Miscellaneous --
658
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000659 # Neither commonName nor subjectAltName
660 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
661 'subject': ((('countryName', 'US'),),
662 (('stateOrProvinceName', 'California'),),
663 (('localityName', 'Mountain View'),),
664 (('organizationName', 'Google Inc'),))}
665 fail(cert, 'mail.google.com')
666
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200667 # No DNS entry in subjectAltName but a commonName
668 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
669 'subject': ((('countryName', 'US'),),
670 (('stateOrProvinceName', 'California'),),
671 (('localityName', 'Mountain View'),),
672 (('commonName', 'mail.google.com'),)),
673 'subjectAltName': (('othername', 'blabla'), )}
674 ok(cert, 'mail.google.com')
675
676 # No DNS entry subjectAltName and no commonName
677 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
678 'subject': ((('countryName', 'US'),),
679 (('stateOrProvinceName', 'California'),),
680 (('localityName', 'Mountain View'),),
681 (('organizationName', 'Google Inc'),)),
682 'subjectAltName': (('othername', 'blabla'),)}
683 fail(cert, 'google.com')
684
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000685 # Empty cert / no cert
686 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
687 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
688
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200689 # Issue #17980: avoid denials of service by refusing more than one
690 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800691 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
692 with self.assertRaisesRegex(
693 ssl.CertificateError,
694 "partial wildcards in leftmost label are not supported"):
695 ssl.match_hostname(cert, 'axxb.example.com')
696
697 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
698 with self.assertRaisesRegex(
699 ssl.CertificateError,
700 "wildcard can only be present in the leftmost label"):
701 ssl.match_hostname(cert, 'www.sub.example.com')
702
703 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
704 with self.assertRaisesRegex(
705 ssl.CertificateError,
706 "too many wildcards"):
707 ssl.match_hostname(cert, 'axxbxxc.example.com')
708
709 cert = {'subject': ((('commonName', '*'),),)}
710 with self.assertRaisesRegex(
711 ssl.CertificateError,
712 "sole wildcard without additional labels are not support"):
713 ssl.match_hostname(cert, 'host')
714
715 cert = {'subject': ((('commonName', '*.com'),),)}
716 with self.assertRaisesRegex(
717 ssl.CertificateError,
718 r"hostname 'com' doesn't match '\*.com'"):
719 ssl.match_hostname(cert, 'com')
720
721 # extra checks for _inet_paton()
722 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
723 with self.assertRaises(ValueError):
724 ssl._inet_paton(invalid)
725 for ipaddr in ['127.0.0.1', '192.168.0.1']:
726 self.assertTrue(ssl._inet_paton(ipaddr))
727 if hasattr(socket, 'AF_INET6'):
728 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
729 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200730
Antoine Pitroud5323212010-10-22 18:19:07 +0000731 def test_server_side(self):
732 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200733 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000734 with socket.socket() as sock:
735 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
736 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000737
Antoine Pitroud6494802011-07-21 01:11:30 +0200738 def test_unknown_channel_binding(self):
739 # should raise ValueError for unknown type
740 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200741 s.bind(('127.0.0.1', 0))
742 s.listen()
743 c = socket.socket(socket.AF_INET)
744 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200745 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100746 with self.assertRaises(ValueError):
747 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200748 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200749
750 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
751 "'tls-unique' channel binding not available")
752 def test_tls_unique_channel_binding(self):
753 # unconnected should return None for known type
754 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200755 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100756 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200757 # the same for server-side
758 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200759 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100760 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200761
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600762 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200763 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600764 r = repr(ss)
765 with self.assertWarns(ResourceWarning) as cm:
766 ss = None
767 support.gc_collect()
768 self.assertIn(r, str(cm.warning.args[0]))
769
Christian Heimes6d7ad132013-06-09 18:02:55 +0200770 def test_get_default_verify_paths(self):
771 paths = ssl.get_default_verify_paths()
772 self.assertEqual(len(paths), 6)
773 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
774
775 with support.EnvironmentVarGuard() as env:
776 env["SSL_CERT_DIR"] = CAPATH
777 env["SSL_CERT_FILE"] = CERTFILE
778 paths = ssl.get_default_verify_paths()
779 self.assertEqual(paths.cafile, CERTFILE)
780 self.assertEqual(paths.capath, CAPATH)
781
Christian Heimes44109d72013-11-22 01:51:30 +0100782 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
783 def test_enum_certificates(self):
784 self.assertTrue(ssl.enum_certificates("CA"))
785 self.assertTrue(ssl.enum_certificates("ROOT"))
786
787 self.assertRaises(TypeError, ssl.enum_certificates)
788 self.assertRaises(WindowsError, ssl.enum_certificates, "")
789
Christian Heimesc2d65e12013-11-22 16:13:55 +0100790 trust_oids = set()
791 for storename in ("CA", "ROOT"):
792 store = ssl.enum_certificates(storename)
793 self.assertIsInstance(store, list)
794 for element in store:
795 self.assertIsInstance(element, tuple)
796 self.assertEqual(len(element), 3)
797 cert, enc, trust = element
798 self.assertIsInstance(cert, bytes)
799 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
800 self.assertIsInstance(trust, (set, bool))
801 if isinstance(trust, set):
802 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100803
804 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100805 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200806
Christian Heimes46bebee2013-06-09 19:03:31 +0200807 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100808 def test_enum_crls(self):
809 self.assertTrue(ssl.enum_crls("CA"))
810 self.assertRaises(TypeError, ssl.enum_crls)
811 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200812
Christian Heimes44109d72013-11-22 01:51:30 +0100813 crls = ssl.enum_crls("CA")
814 self.assertIsInstance(crls, list)
815 for element in crls:
816 self.assertIsInstance(element, tuple)
817 self.assertEqual(len(element), 2)
818 self.assertIsInstance(element[0], bytes)
819 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200820
Christian Heimes46bebee2013-06-09 19:03:31 +0200821
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100822 def test_asn1object(self):
823 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
824 '1.3.6.1.5.5.7.3.1')
825
826 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
827 self.assertEqual(val, expected)
828 self.assertEqual(val.nid, 129)
829 self.assertEqual(val.shortname, 'serverAuth')
830 self.assertEqual(val.longname, 'TLS Web Server Authentication')
831 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
832 self.assertIsInstance(val, ssl._ASN1Object)
833 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
834
835 val = ssl._ASN1Object.fromnid(129)
836 self.assertEqual(val, expected)
837 self.assertIsInstance(val, ssl._ASN1Object)
838 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100839 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
840 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100841 for i in range(1000):
842 try:
843 obj = ssl._ASN1Object.fromnid(i)
844 except ValueError:
845 pass
846 else:
847 self.assertIsInstance(obj.nid, int)
848 self.assertIsInstance(obj.shortname, str)
849 self.assertIsInstance(obj.longname, str)
850 self.assertIsInstance(obj.oid, (str, type(None)))
851
852 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
853 self.assertEqual(val, expected)
854 self.assertIsInstance(val, ssl._ASN1Object)
855 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
856 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
857 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100858 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
859 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100860
Christian Heimes72d28502013-11-23 13:56:58 +0100861 def test_purpose_enum(self):
862 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
863 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
864 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
865 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
866 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
867 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
868 '1.3.6.1.5.5.7.3.1')
869
870 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
871 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
872 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
873 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
874 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
875 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
876 '1.3.6.1.5.5.7.3.2')
877
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100878 def test_unsupported_dtls(self):
879 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
880 self.addCleanup(s.close)
881 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200882 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100883 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200884 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100885 with self.assertRaises(NotImplementedError) as cx:
886 ctx.wrap_socket(s)
887 self.assertEqual(str(cx.exception), "only stream sockets are supported")
888
Antoine Pitrouc695c952014-04-28 20:57:36 +0200889 def cert_time_ok(self, timestring, timestamp):
890 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
891
892 def cert_time_fail(self, timestring):
893 with self.assertRaises(ValueError):
894 ssl.cert_time_to_seconds(timestring)
895
896 @unittest.skipUnless(utc_offset(),
897 'local time needs to be different from UTC')
898 def test_cert_time_to_seconds_timezone(self):
899 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
900 # results if local timezone is not UTC
901 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
902 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
903
904 def test_cert_time_to_seconds(self):
905 timestring = "Jan 5 09:34:43 2018 GMT"
906 ts = 1515144883.0
907 self.cert_time_ok(timestring, ts)
908 # accept keyword parameter, assert its name
909 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
910 # accept both %e and %d (space or zero generated by strftime)
911 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
912 # case-insensitive
913 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
914 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
915 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
916 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
917 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
918 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
919 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
920 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
921
922 newyear_ts = 1230768000.0
923 # leap seconds
924 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
925 # same timestamp
926 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
927
928 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
929 # allow 60th second (even if it is not a leap second)
930 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
931 # allow 2nd leap second for compatibility with time.strptime()
932 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
933 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
934
Mike53f7a7c2017-12-14 14:04:53 +0300935 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200936 # 99991231235959Z (rfc 5280)
937 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
938
939 @support.run_with_locale('LC_ALL', '')
940 def test_cert_time_to_seconds_locale(self):
941 # `cert_time_to_seconds()` should be locale independent
942
943 def local_february_name():
944 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
945
946 if local_february_name().lower() == 'feb':
947 self.skipTest("locale-specific month name needs to be "
948 "different from C locale")
949
950 # locale-independent
951 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
952 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
953
Martin Panter3840b2a2016-03-27 01:53:46 +0000954 def test_connect_ex_error(self):
955 server = socket.socket(socket.AF_INET)
956 self.addCleanup(server.close)
957 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200958 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000959 cert_reqs=ssl.CERT_REQUIRED)
960 self.addCleanup(s.close)
961 rc = s.connect_ex((HOST, port))
962 # Issue #19919: Windows machines or VMs hosted on Windows
963 # machines sometimes return EWOULDBLOCK.
964 errors = (
965 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
966 errno.EWOULDBLOCK,
967 )
968 self.assertIn(rc, errors)
969
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100970
Antoine Pitrou152efa22010-05-16 18:19:27 +0000971class ContextTests(unittest.TestCase):
972
Antoine Pitrou23df4832010-08-04 17:14:06 +0000973 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000974 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100975 for protocol in PROTOCOLS:
976 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200977 ctx = ssl.SSLContext()
978 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000979 self.assertRaises(ValueError, ssl.SSLContext, -1)
980 self.assertRaises(ValueError, ssl.SSLContext, 42)
981
Antoine Pitrou23df4832010-08-04 17:14:06 +0000982 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000983 def test_protocol(self):
984 for proto in PROTOCOLS:
985 ctx = ssl.SSLContext(proto)
986 self.assertEqual(ctx.protocol, proto)
987
988 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200989 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000990 ctx.set_ciphers("ALL")
991 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000992 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000993 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000994
Christian Heimes892d66e2018-01-29 14:10:18 +0100995 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
996 "Test applies only to Python default ciphers")
997 def test_python_ciphers(self):
998 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
999 ciphers = ctx.get_ciphers()
1000 for suite in ciphers:
1001 name = suite['name']
1002 self.assertNotIn("PSK", name)
1003 self.assertNotIn("SRP", name)
1004 self.assertNotIn("MD5", name)
1005 self.assertNotIn("RC4", name)
1006 self.assertNotIn("3DES", name)
1007
Christian Heimes25bfcd52016-09-06 00:04:45 +02001008 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1009 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001010 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001011 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001012 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001013 self.assertIn('AES256-GCM-SHA384', names)
1014 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001015
Antoine Pitrou23df4832010-08-04 17:14:06 +00001016 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001017 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001018 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001019 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001020 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001021 # SSLContext also enables these by default
1022 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001023 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1024 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001025 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001026 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001027 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001028 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001029 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1030 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001031 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001032 # Ubuntu has OP_NO_SSLv3 forced on by default
1033 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001034 else:
1035 with self.assertRaises(ValueError):
1036 ctx.options = 0
1037
Christian Heimesa170fa12017-09-15 20:27:30 +02001038 def test_verify_mode_protocol(self):
1039 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001040 # Default value
1041 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1042 ctx.verify_mode = ssl.CERT_OPTIONAL
1043 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1044 ctx.verify_mode = ssl.CERT_REQUIRED
1045 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1046 ctx.verify_mode = ssl.CERT_NONE
1047 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1048 with self.assertRaises(TypeError):
1049 ctx.verify_mode = None
1050 with self.assertRaises(ValueError):
1051 ctx.verify_mode = 42
1052
Christian Heimesa170fa12017-09-15 20:27:30 +02001053 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1054 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1055 self.assertFalse(ctx.check_hostname)
1056
1057 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1058 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1059 self.assertTrue(ctx.check_hostname)
1060
Christian Heimes61d478c2018-01-27 15:51:38 +01001061 def test_hostname_checks_common_name(self):
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1063 self.assertTrue(ctx.hostname_checks_common_name)
1064 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1065 ctx.hostname_checks_common_name = True
1066 self.assertTrue(ctx.hostname_checks_common_name)
1067 ctx.hostname_checks_common_name = False
1068 self.assertFalse(ctx.hostname_checks_common_name)
1069 ctx.hostname_checks_common_name = True
1070 self.assertTrue(ctx.hostname_checks_common_name)
1071 else:
1072 with self.assertRaises(AttributeError):
1073 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001074
Christian Heimes2427b502013-11-23 11:24:32 +01001075 @unittest.skipUnless(have_verify_flags(),
1076 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001077 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001079 # default value
1080 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1081 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001082 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1083 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1084 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1085 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1086 ctx.verify_flags = ssl.VERIFY_DEFAULT
1087 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1088 # supports any value
1089 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1090 self.assertEqual(ctx.verify_flags,
1091 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1092 with self.assertRaises(TypeError):
1093 ctx.verify_flags = None
1094
Antoine Pitrou152efa22010-05-16 18:19:27 +00001095 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001096 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001097 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001098 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001099 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1100 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001101 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001102 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001103 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001104 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001105 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001106 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001107 ctx.load_cert_chain(EMPTYCERT)
1108 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001109 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001110 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1111 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1112 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001113 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001114 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001115 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001116 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001117 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001118 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1119 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001120 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001121 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001122 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001123 # Password protected key and cert
1124 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1125 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1126 ctx.load_cert_chain(CERTFILE_PROTECTED,
1127 password=bytearray(KEY_PASSWORD.encode()))
1128 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1129 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1130 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1131 bytearray(KEY_PASSWORD.encode()))
1132 with self.assertRaisesRegex(TypeError, "should be a string"):
1133 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1134 with self.assertRaises(ssl.SSLError):
1135 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1136 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1137 # openssl has a fixed limit on the password buffer.
1138 # PEM_BUFSIZE is generally set to 1kb.
1139 # Return a string larger than this.
1140 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1141 # Password callback
1142 def getpass_unicode():
1143 return KEY_PASSWORD
1144 def getpass_bytes():
1145 return KEY_PASSWORD.encode()
1146 def getpass_bytearray():
1147 return bytearray(KEY_PASSWORD.encode())
1148 def getpass_badpass():
1149 return "badpass"
1150 def getpass_huge():
1151 return b'a' * (1024 * 1024)
1152 def getpass_bad_type():
1153 return 9
1154 def getpass_exception():
1155 raise Exception('getpass error')
1156 class GetPassCallable:
1157 def __call__(self):
1158 return KEY_PASSWORD
1159 def getpass(self):
1160 return KEY_PASSWORD
1161 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1162 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1163 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1164 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1165 ctx.load_cert_chain(CERTFILE_PROTECTED,
1166 password=GetPassCallable().getpass)
1167 with self.assertRaises(ssl.SSLError):
1168 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1169 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1170 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1171 with self.assertRaisesRegex(TypeError, "must return a string"):
1172 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1173 with self.assertRaisesRegex(Exception, "getpass error"):
1174 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1175 # Make sure the password function isn't called if it isn't needed
1176 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001177
1178 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001179 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001180 ctx.load_verify_locations(CERTFILE)
1181 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1182 ctx.load_verify_locations(BYTES_CERTFILE)
1183 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1184 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001185 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001186 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001187 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001188 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001189 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001190 ctx.load_verify_locations(BADCERT)
1191 ctx.load_verify_locations(CERTFILE, CAPATH)
1192 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1193
Victor Stinner80f75e62011-01-29 11:31:20 +00001194 # Issue #10989: crash if the second argument type is invalid
1195 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1196
Christian Heimesefff7062013-11-21 03:35:02 +01001197 def test_load_verify_cadata(self):
1198 # test cadata
1199 with open(CAFILE_CACERT) as f:
1200 cacert_pem = f.read()
1201 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1202 with open(CAFILE_NEURONIO) as f:
1203 neuronio_pem = f.read()
1204 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1205
1206 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001207 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001208 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1209 ctx.load_verify_locations(cadata=cacert_pem)
1210 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1211 ctx.load_verify_locations(cadata=neuronio_pem)
1212 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1213 # cert already in hash table
1214 ctx.load_verify_locations(cadata=neuronio_pem)
1215 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1216
1217 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001218 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001219 combined = "\n".join((cacert_pem, neuronio_pem))
1220 ctx.load_verify_locations(cadata=combined)
1221 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1222
1223 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001224 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001225 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1226 neuronio_pem, "tail"]
1227 ctx.load_verify_locations(cadata="\n".join(combined))
1228 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1229
1230 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001231 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001232 ctx.load_verify_locations(cadata=cacert_der)
1233 ctx.load_verify_locations(cadata=neuronio_der)
1234 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1235 # cert already in hash table
1236 ctx.load_verify_locations(cadata=cacert_der)
1237 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1238
1239 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001240 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001241 combined = b"".join((cacert_der, neuronio_der))
1242 ctx.load_verify_locations(cadata=combined)
1243 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1244
1245 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001246 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001247 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1248
1249 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1250 ctx.load_verify_locations(cadata="broken")
1251 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1252 ctx.load_verify_locations(cadata=b"broken")
1253
1254
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001255 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001256 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001257 ctx.load_dh_params(DHFILE)
1258 if os.name != 'nt':
1259 ctx.load_dh_params(BYTES_DHFILE)
1260 self.assertRaises(TypeError, ctx.load_dh_params)
1261 self.assertRaises(TypeError, ctx.load_dh_params, None)
1262 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001263 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001264 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001265 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001266 ctx.load_dh_params(CERTFILE)
1267
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001268 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001269 def test_session_stats(self):
1270 for proto in PROTOCOLS:
1271 ctx = ssl.SSLContext(proto)
1272 self.assertEqual(ctx.session_stats(), {
1273 'number': 0,
1274 'connect': 0,
1275 'connect_good': 0,
1276 'connect_renegotiate': 0,
1277 'accept': 0,
1278 'accept_good': 0,
1279 'accept_renegotiate': 0,
1280 'hits': 0,
1281 'misses': 0,
1282 'timeouts': 0,
1283 'cache_full': 0,
1284 })
1285
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001286 def test_set_default_verify_paths(self):
1287 # There's not much we can do to test that it acts as expected,
1288 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001289 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001290 ctx.set_default_verify_paths()
1291
Antoine Pitrou501da612011-12-21 09:27:41 +01001292 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001293 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001294 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001295 ctx.set_ecdh_curve("prime256v1")
1296 ctx.set_ecdh_curve(b"prime256v1")
1297 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1298 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1299 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1300 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1301
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001302 @needs_sni
1303 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001304 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001305
1306 # set_servername_callback expects a callable, or None
1307 self.assertRaises(TypeError, ctx.set_servername_callback)
1308 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1309 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1310 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1311
1312 def dummycallback(sock, servername, ctx):
1313 pass
1314 ctx.set_servername_callback(None)
1315 ctx.set_servername_callback(dummycallback)
1316
1317 @needs_sni
1318 def test_sni_callback_refcycle(self):
1319 # Reference cycles through the servername callback are detected
1320 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001321 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001322 def dummycallback(sock, servername, ctx, cycle=ctx):
1323 pass
1324 ctx.set_servername_callback(dummycallback)
1325 wr = weakref.ref(ctx)
1326 del ctx, dummycallback
1327 gc.collect()
1328 self.assertIs(wr(), None)
1329
Christian Heimes9a5395a2013-06-17 15:44:12 +02001330 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001331 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001332 self.assertEqual(ctx.cert_store_stats(),
1333 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1334 ctx.load_cert_chain(CERTFILE)
1335 self.assertEqual(ctx.cert_store_stats(),
1336 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1337 ctx.load_verify_locations(CERTFILE)
1338 self.assertEqual(ctx.cert_store_stats(),
1339 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001340 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001341 self.assertEqual(ctx.cert_store_stats(),
1342 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1343
1344 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001345 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001346 self.assertEqual(ctx.get_ca_certs(), [])
1347 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1348 ctx.load_verify_locations(CERTFILE)
1349 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001350 # but CAFILE_CACERT is a CA cert
1351 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001352 self.assertEqual(ctx.get_ca_certs(),
1353 [{'issuer': ((('organizationName', 'Root CA'),),
1354 (('organizationalUnitName', 'http://www.cacert.org'),),
1355 (('commonName', 'CA Cert Signing Authority'),),
1356 (('emailAddress', 'support@cacert.org'),)),
1357 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1358 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1359 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001360 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001361 'subject': ((('organizationName', 'Root CA'),),
1362 (('organizationalUnitName', 'http://www.cacert.org'),),
1363 (('commonName', 'CA Cert Signing Authority'),),
1364 (('emailAddress', 'support@cacert.org'),)),
1365 'version': 3}])
1366
Martin Panterb55f8b72016-01-14 12:53:56 +00001367 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001368 pem = f.read()
1369 der = ssl.PEM_cert_to_DER_cert(pem)
1370 self.assertEqual(ctx.get_ca_certs(True), [der])
1371
Christian Heimes72d28502013-11-23 13:56:58 +01001372 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001373 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001374 ctx.load_default_certs()
1375
Christian Heimesa170fa12017-09-15 20:27:30 +02001376 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001377 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1378 ctx.load_default_certs()
1379
Christian Heimesa170fa12017-09-15 20:27:30 +02001380 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001381 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1382
Christian Heimesa170fa12017-09-15 20:27:30 +02001383 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001384 self.assertRaises(TypeError, ctx.load_default_certs, None)
1385 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1386
Benjamin Peterson91244e02014-10-03 18:17:15 -04001387 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001388 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001389 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001390 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001391 with support.EnvironmentVarGuard() as env:
1392 env["SSL_CERT_DIR"] = CAPATH
1393 env["SSL_CERT_FILE"] = CERTFILE
1394 ctx.load_default_certs()
1395 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1396
Benjamin Peterson91244e02014-10-03 18:17:15 -04001397 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001398 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001399 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001400 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001401 ctx.load_default_certs()
1402 stats = ctx.cert_store_stats()
1403
Christian Heimesa170fa12017-09-15 20:27:30 +02001404 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001405 with support.EnvironmentVarGuard() as env:
1406 env["SSL_CERT_DIR"] = CAPATH
1407 env["SSL_CERT_FILE"] = CERTFILE
1408 ctx.load_default_certs()
1409 stats["x509"] += 1
1410 self.assertEqual(ctx.cert_store_stats(), stats)
1411
Christian Heimes358cfd42016-09-10 22:43:48 +02001412 def _assert_context_options(self, ctx):
1413 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1414 if OP_NO_COMPRESSION != 0:
1415 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1416 OP_NO_COMPRESSION)
1417 if OP_SINGLE_DH_USE != 0:
1418 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1419 OP_SINGLE_DH_USE)
1420 if OP_SINGLE_ECDH_USE != 0:
1421 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1422 OP_SINGLE_ECDH_USE)
1423 if OP_CIPHER_SERVER_PREFERENCE != 0:
1424 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1425 OP_CIPHER_SERVER_PREFERENCE)
1426
Christian Heimes4c05b472013-11-23 15:58:30 +01001427 def test_create_default_context(self):
1428 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001429
Christian Heimesa170fa12017-09-15 20:27:30 +02001430 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001431 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001432 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001433 self._assert_context_options(ctx)
1434
Christian Heimes4c05b472013-11-23 15:58:30 +01001435 with open(SIGNING_CA) as f:
1436 cadata = f.read()
1437 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1438 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001439 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001440 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001441 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001442
1443 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001444 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001445 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001446 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001447
Christian Heimes67986f92013-11-23 22:43:47 +01001448 def test__create_stdlib_context(self):
1449 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001450 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001451 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001452 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001453 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001454
1455 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1456 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1457 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001458 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001459
1460 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001461 cert_reqs=ssl.CERT_REQUIRED,
1462 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001463 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1464 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001465 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001466 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001467
1468 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001469 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001470 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001471 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001472
Christian Heimes1aa9a752013-12-02 02:41:19 +01001473 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001475 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001476 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001477
Christian Heimese82c0342017-09-15 20:29:57 +02001478 # Auto set CERT_REQUIRED
1479 ctx.check_hostname = True
1480 self.assertTrue(ctx.check_hostname)
1481 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1482 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001483 ctx.verify_mode = ssl.CERT_REQUIRED
1484 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001485 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001486
Christian Heimese82c0342017-09-15 20:29:57 +02001487 # Changing verify_mode does not affect check_hostname
1488 ctx.check_hostname = False
1489 ctx.verify_mode = ssl.CERT_NONE
1490 ctx.check_hostname = False
1491 self.assertFalse(ctx.check_hostname)
1492 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1493 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001494 ctx.check_hostname = True
1495 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001496 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1497
1498 ctx.check_hostname = False
1499 ctx.verify_mode = ssl.CERT_OPTIONAL
1500 ctx.check_hostname = False
1501 self.assertFalse(ctx.check_hostname)
1502 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1503 # keep CERT_OPTIONAL
1504 ctx.check_hostname = True
1505 self.assertTrue(ctx.check_hostname)
1506 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001507
1508 # Cannot set CERT_NONE with check_hostname enabled
1509 with self.assertRaises(ValueError):
1510 ctx.verify_mode = ssl.CERT_NONE
1511 ctx.check_hostname = False
1512 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001513 ctx.verify_mode = ssl.CERT_NONE
1514 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001515
Christian Heimes5fe668c2016-09-12 00:01:11 +02001516 def test_context_client_server(self):
1517 # PROTOCOL_TLS_CLIENT has sane defaults
1518 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1519 self.assertTrue(ctx.check_hostname)
1520 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1521
1522 # PROTOCOL_TLS_SERVER has different but also sane defaults
1523 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1524 self.assertFalse(ctx.check_hostname)
1525 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1526
Christian Heimes4df60f12017-09-15 20:26:05 +02001527 def test_context_custom_class(self):
1528 class MySSLSocket(ssl.SSLSocket):
1529 pass
1530
1531 class MySSLObject(ssl.SSLObject):
1532 pass
1533
1534 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1535 ctx.sslsocket_class = MySSLSocket
1536 ctx.sslobject_class = MySSLObject
1537
1538 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1539 self.assertIsInstance(sock, MySSLSocket)
1540 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1541 self.assertIsInstance(obj, MySSLObject)
1542
Antoine Pitrou152efa22010-05-16 18:19:27 +00001543
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001544class SSLErrorTests(unittest.TestCase):
1545
1546 def test_str(self):
1547 # The str() of a SSLError doesn't include the errno
1548 e = ssl.SSLError(1, "foo")
1549 self.assertEqual(str(e), "foo")
1550 self.assertEqual(e.errno, 1)
1551 # Same for a subclass
1552 e = ssl.SSLZeroReturnError(1, "foo")
1553 self.assertEqual(str(e), "foo")
1554 self.assertEqual(e.errno, 1)
1555
1556 def test_lib_reason(self):
1557 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001558 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001559 with self.assertRaises(ssl.SSLError) as cm:
1560 ctx.load_dh_params(CERTFILE)
1561 self.assertEqual(cm.exception.library, 'PEM')
1562 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1563 s = str(cm.exception)
1564 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1565
1566 def test_subclass(self):
1567 # Check that the appropriate SSLError subclass is raised
1568 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001569 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1570 ctx.check_hostname = False
1571 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001572 with socket.socket() as s:
1573 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001574 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001575 c = socket.socket()
1576 c.connect(s.getsockname())
1577 c.setblocking(False)
1578 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001579 with self.assertRaises(ssl.SSLWantReadError) as cm:
1580 c.do_handshake()
1581 s = str(cm.exception)
1582 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1583 # For compatibility
1584 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1585
1586
Christian Heimes61d478c2018-01-27 15:51:38 +01001587 def test_bad_server_hostname(self):
1588 ctx = ssl.create_default_context()
1589 with self.assertRaises(ValueError):
1590 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1591 server_hostname="")
1592 with self.assertRaises(ValueError):
1593 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1594 server_hostname=".example.org")
1595
1596
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001597class MemoryBIOTests(unittest.TestCase):
1598
1599 def test_read_write(self):
1600 bio = ssl.MemoryBIO()
1601 bio.write(b'foo')
1602 self.assertEqual(bio.read(), b'foo')
1603 self.assertEqual(bio.read(), b'')
1604 bio.write(b'foo')
1605 bio.write(b'bar')
1606 self.assertEqual(bio.read(), b'foobar')
1607 self.assertEqual(bio.read(), b'')
1608 bio.write(b'baz')
1609 self.assertEqual(bio.read(2), b'ba')
1610 self.assertEqual(bio.read(1), b'z')
1611 self.assertEqual(bio.read(1), b'')
1612
1613 def test_eof(self):
1614 bio = ssl.MemoryBIO()
1615 self.assertFalse(bio.eof)
1616 self.assertEqual(bio.read(), b'')
1617 self.assertFalse(bio.eof)
1618 bio.write(b'foo')
1619 self.assertFalse(bio.eof)
1620 bio.write_eof()
1621 self.assertFalse(bio.eof)
1622 self.assertEqual(bio.read(2), b'fo')
1623 self.assertFalse(bio.eof)
1624 self.assertEqual(bio.read(1), b'o')
1625 self.assertTrue(bio.eof)
1626 self.assertEqual(bio.read(), b'')
1627 self.assertTrue(bio.eof)
1628
1629 def test_pending(self):
1630 bio = ssl.MemoryBIO()
1631 self.assertEqual(bio.pending, 0)
1632 bio.write(b'foo')
1633 self.assertEqual(bio.pending, 3)
1634 for i in range(3):
1635 bio.read(1)
1636 self.assertEqual(bio.pending, 3-i-1)
1637 for i in range(3):
1638 bio.write(b'x')
1639 self.assertEqual(bio.pending, i+1)
1640 bio.read()
1641 self.assertEqual(bio.pending, 0)
1642
1643 def test_buffer_types(self):
1644 bio = ssl.MemoryBIO()
1645 bio.write(b'foo')
1646 self.assertEqual(bio.read(), b'foo')
1647 bio.write(bytearray(b'bar'))
1648 self.assertEqual(bio.read(), b'bar')
1649 bio.write(memoryview(b'baz'))
1650 self.assertEqual(bio.read(), b'baz')
1651
1652 def test_error_types(self):
1653 bio = ssl.MemoryBIO()
1654 self.assertRaises(TypeError, bio.write, 'foo')
1655 self.assertRaises(TypeError, bio.write, None)
1656 self.assertRaises(TypeError, bio.write, True)
1657 self.assertRaises(TypeError, bio.write, 1)
1658
1659
Martin Panter3840b2a2016-03-27 01:53:46 +00001660class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001661 """Tests that connect to a simple server running in the background"""
1662
1663 def setUp(self):
1664 server = ThreadedEchoServer(SIGNED_CERTFILE)
1665 self.server_addr = (HOST, server.port)
1666 server.__enter__()
1667 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001668
Antoine Pitrou480a1242010-04-28 21:37:09 +00001669 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001670 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001671 cert_reqs=ssl.CERT_NONE) as s:
1672 s.connect(self.server_addr)
1673 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001674 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001675
Martin Panter3840b2a2016-03-27 01:53:46 +00001676 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001677 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001678 cert_reqs=ssl.CERT_REQUIRED,
1679 ca_certs=SIGNING_CA) as s:
1680 s.connect(self.server_addr)
1681 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001682 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001683
Martin Panter3840b2a2016-03-27 01:53:46 +00001684 def test_connect_fail(self):
1685 # This should fail because we have no verification certs. Connection
1686 # failure crashes ThreadedEchoServer, so run this in an independent
1687 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001688 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001689 cert_reqs=ssl.CERT_REQUIRED)
1690 self.addCleanup(s.close)
1691 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1692 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001693
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001694 def test_connect_ex(self):
1695 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001696 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001697 cert_reqs=ssl.CERT_REQUIRED,
1698 ca_certs=SIGNING_CA)
1699 self.addCleanup(s.close)
1700 self.assertEqual(0, s.connect_ex(self.server_addr))
1701 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001702
1703 def test_non_blocking_connect_ex(self):
1704 # Issue #11326: non-blocking connect_ex() should allow handshake
1705 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001706 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001707 cert_reqs=ssl.CERT_REQUIRED,
1708 ca_certs=SIGNING_CA,
1709 do_handshake_on_connect=False)
1710 self.addCleanup(s.close)
1711 s.setblocking(False)
1712 rc = s.connect_ex(self.server_addr)
1713 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1714 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1715 # Wait for connect to finish
1716 select.select([], [s], [], 5.0)
1717 # Non-blocking handshake
1718 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001719 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001720 s.do_handshake()
1721 break
1722 except ssl.SSLWantReadError:
1723 select.select([s], [], [], 5.0)
1724 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001725 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001726 # SSL established
1727 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001728
Antoine Pitrou152efa22010-05-16 18:19:27 +00001729 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001730 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001731 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001732 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1733 s.connect(self.server_addr)
1734 self.assertEqual({}, s.getpeercert())
1735 # Same with a server hostname
1736 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1737 server_hostname="dummy") as s:
1738 s.connect(self.server_addr)
1739 ctx.verify_mode = ssl.CERT_REQUIRED
1740 # This should succeed because we specify the root cert
1741 ctx.load_verify_locations(SIGNING_CA)
1742 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1743 s.connect(self.server_addr)
1744 cert = s.getpeercert()
1745 self.assertTrue(cert)
1746
1747 def test_connect_with_context_fail(self):
1748 # This should fail because we have no verification certs. Connection
1749 # failure crashes ThreadedEchoServer, so run this in an independent
1750 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001751 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001752 ctx.verify_mode = ssl.CERT_REQUIRED
1753 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1754 self.addCleanup(s.close)
1755 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1756 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001757
1758 def test_connect_capath(self):
1759 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001760 # NOTE: the subject hashing algorithm has been changed between
1761 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1762 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001763 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001764 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001765 ctx.verify_mode = ssl.CERT_REQUIRED
1766 ctx.load_verify_locations(capath=CAPATH)
1767 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1768 s.connect(self.server_addr)
1769 cert = s.getpeercert()
1770 self.assertTrue(cert)
1771 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001772 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001773 ctx.verify_mode = ssl.CERT_REQUIRED
1774 ctx.load_verify_locations(capath=BYTES_CAPATH)
1775 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1776 s.connect(self.server_addr)
1777 cert = s.getpeercert()
1778 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001779
Christian Heimesefff7062013-11-21 03:35:02 +01001780 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001781 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001782 pem = f.read()
1783 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001784 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001785 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001786 # TODO: fix TLSv1.3 support
1787 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001788 ctx.load_verify_locations(cadata=pem)
1789 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1790 s.connect(self.server_addr)
1791 cert = s.getpeercert()
1792 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001793
Martin Panter3840b2a2016-03-27 01:53:46 +00001794 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001795 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001796 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001797 # TODO: fix TLSv1.3 support
1798 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001799 ctx.load_verify_locations(cadata=der)
1800 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1801 s.connect(self.server_addr)
1802 cert = s.getpeercert()
1803 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001804
Antoine Pitroue3220242010-04-24 11:13:53 +00001805 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1806 def test_makefile_close(self):
1807 # Issue #5238: creating a file-like object with makefile() shouldn't
1808 # delay closing the underlying "real socket" (here tested with its
1809 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001810 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001811 ss.connect(self.server_addr)
1812 fd = ss.fileno()
1813 f = ss.makefile()
1814 f.close()
1815 # The fd is still open
1816 os.read(fd, 0)
1817 # Closing the SSL socket should close the fd too
1818 ss.close()
1819 gc.collect()
1820 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001821 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001822 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001823
Antoine Pitrou480a1242010-04-28 21:37:09 +00001824 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001825 s = socket.socket(socket.AF_INET)
1826 s.connect(self.server_addr)
1827 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001828 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 cert_reqs=ssl.CERT_NONE,
1830 do_handshake_on_connect=False)
1831 self.addCleanup(s.close)
1832 count = 0
1833 while True:
1834 try:
1835 count += 1
1836 s.do_handshake()
1837 break
1838 except ssl.SSLWantReadError:
1839 select.select([s], [], [])
1840 except ssl.SSLWantWriteError:
1841 select.select([], [s], [])
1842 if support.verbose:
1843 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001844
Antoine Pitrou480a1242010-04-28 21:37:09 +00001845 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001846 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001847
Martin Panter3840b2a2016-03-27 01:53:46 +00001848 def test_get_server_certificate_fail(self):
1849 # Connection failure crashes ThreadedEchoServer, so run this in an
1850 # independent test method
1851 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001852
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001853 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001854 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001855 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1856 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001857 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001858 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1859 s.connect(self.server_addr)
1860 # Error checking can happen at instantiation or when connecting
1861 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1862 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001863 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001864 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1865 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001866
Christian Heimes9a5395a2013-06-17 15:44:12 +02001867 def test_get_ca_certs_capath(self):
1868 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001869 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001870 ctx.load_verify_locations(capath=CAPATH)
1871 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001872 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1873 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001874 s.connect(self.server_addr)
1875 cert = s.getpeercert()
1876 self.assertTrue(cert)
1877 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001878
Christian Heimes575596e2013-12-15 21:49:17 +01001879 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001880 def test_context_setget(self):
1881 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001882 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1883 ctx1.load_verify_locations(capath=CAPATH)
1884 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1885 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001886 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001887 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001888 ss.connect(self.server_addr)
1889 self.assertIs(ss.context, ctx1)
1890 self.assertIs(ss._sslobj.context, ctx1)
1891 ss.context = ctx2
1892 self.assertIs(ss.context, ctx2)
1893 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001894
1895 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1896 # A simple IO loop. Call func(*args) depending on the error we get
1897 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1898 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001899 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001900 count = 0
1901 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001902 if time.monotonic() > deadline:
1903 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001904 errno = None
1905 count += 1
1906 try:
1907 ret = func(*args)
1908 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001909 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001910 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001911 raise
1912 errno = e.errno
1913 # Get any data from the outgoing BIO irrespective of any error, and
1914 # send it to the socket.
1915 buf = outgoing.read()
1916 sock.sendall(buf)
1917 # If there's no error, we're done. For WANT_READ, we need to get
1918 # data from the socket and put it in the incoming BIO.
1919 if errno is None:
1920 break
1921 elif errno == ssl.SSL_ERROR_WANT_READ:
1922 buf = sock.recv(32768)
1923 if buf:
1924 incoming.write(buf)
1925 else:
1926 incoming.write_eof()
1927 if support.verbose:
1928 sys.stdout.write("Needed %d calls to complete %s().\n"
1929 % (count, func.__name__))
1930 return ret
1931
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 def test_bio_handshake(self):
1933 sock = socket.socket(socket.AF_INET)
1934 self.addCleanup(sock.close)
1935 sock.connect(self.server_addr)
1936 incoming = ssl.MemoryBIO()
1937 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001938 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1939 self.assertTrue(ctx.check_hostname)
1940 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001941 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001942 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1943 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001944 self.assertIs(sslobj._sslobj.owner, sslobj)
1945 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001946 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001947 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001948 self.assertRaises(ValueError, sslobj.getpeercert)
1949 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1950 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1951 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1952 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001953 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001954 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001955 self.assertTrue(sslobj.getpeercert())
1956 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1957 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1958 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001959 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 except ssl.SSLSyscallError:
1961 # If the server shuts down the TCP connection without sending a
1962 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1963 pass
1964 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1965
1966 def test_bio_read_write_data(self):
1967 sock = socket.socket(socket.AF_INET)
1968 self.addCleanup(sock.close)
1969 sock.connect(self.server_addr)
1970 incoming = ssl.MemoryBIO()
1971 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001972 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001973 ctx.verify_mode = ssl.CERT_NONE
1974 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1975 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1976 req = b'FOO\n'
1977 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1978 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1979 self.assertEqual(buf, b'foo\n')
1980 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001981
1982
Martin Panter3840b2a2016-03-27 01:53:46 +00001983class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001984
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 def test_timeout_connect_ex(self):
1986 # Issue #12065: on a timeout, connect_ex() should return the original
1987 # errno (mimicking the behaviour of non-SSL sockets).
1988 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02001989 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001990 cert_reqs=ssl.CERT_REQUIRED,
1991 do_handshake_on_connect=False)
1992 self.addCleanup(s.close)
1993 s.settimeout(0.0000001)
1994 rc = s.connect_ex((REMOTE_HOST, 443))
1995 if rc == 0:
1996 self.skipTest("REMOTE_HOST responded too quickly")
1997 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1998
1999 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2000 def test_get_server_certificate_ipv6(self):
2001 with support.transient_internet('ipv6.google.com'):
2002 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2003 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2004
Martin Panter3840b2a2016-03-27 01:53:46 +00002005
2006def _test_get_server_certificate(test, host, port, cert=None):
2007 pem = ssl.get_server_certificate((host, port))
2008 if not pem:
2009 test.fail("No server certificate on %s:%s!" % (host, port))
2010
2011 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2012 if not pem:
2013 test.fail("No server certificate on %s:%s!" % (host, port))
2014 if support.verbose:
2015 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2016
2017def _test_get_server_certificate_fail(test, host, port):
2018 try:
2019 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2020 except ssl.SSLError as x:
2021 #should fail
2022 if support.verbose:
2023 sys.stdout.write("%s\n" % x)
2024 else:
2025 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2026
2027
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002028from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002029
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002030class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002031
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002032 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002033
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002034 """A mildly complicated class, because we want it to work both
2035 with and without the SSL wrapper around the socket connection, so
2036 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002037
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002038 def __init__(self, server, connsock, addr):
2039 self.server = server
2040 self.running = False
2041 self.sock = connsock
2042 self.addr = addr
2043 self.sock.setblocking(1)
2044 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002045 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002046 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002047
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002048 def wrap_conn(self):
2049 try:
2050 self.sslconn = self.server.context.wrap_socket(
2051 self.sock, server_side=True)
2052 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2053 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2054 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2055 # We treat ConnectionResetError as though it were an
2056 # SSLError - OpenSSL on Ubuntu abruptly closes the
2057 # connection when asked to use an unsupported protocol.
2058 #
2059 # OSError may occur with wrong protocols, e.g. both
2060 # sides use PROTOCOL_TLS_SERVER.
2061 #
2062 # XXX Various errors can have happened here, for example
2063 # a mismatching protocol version, an invalid certificate,
2064 # or a low-level bug. This should be made more discriminating.
2065 #
2066 # bpo-31323: Store the exception as string to prevent
2067 # a reference leak: server -> conn_errors -> exception
2068 # -> traceback -> self (ConnectionHandler) -> server
2069 self.server.conn_errors.append(str(e))
2070 if self.server.chatty:
2071 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2072 self.running = False
2073 self.server.stop()
2074 self.close()
2075 return False
2076 else:
2077 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2078 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2079 cert = self.sslconn.getpeercert()
2080 if support.verbose and self.server.chatty:
2081 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2082 cert_binary = self.sslconn.getpeercert(True)
2083 if support.verbose and self.server.chatty:
2084 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2085 cipher = self.sslconn.cipher()
2086 if support.verbose and self.server.chatty:
2087 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2088 sys.stdout.write(" server: selected protocol is now "
2089 + str(self.sslconn.selected_npn_protocol()) + "\n")
2090 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002091
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002092 def read(self):
2093 if self.sslconn:
2094 return self.sslconn.read()
2095 else:
2096 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002097
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002098 def write(self, bytes):
2099 if self.sslconn:
2100 return self.sslconn.write(bytes)
2101 else:
2102 return self.sock.send(bytes)
2103
2104 def close(self):
2105 if self.sslconn:
2106 self.sslconn.close()
2107 else:
2108 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002109
Antoine Pitrou480a1242010-04-28 21:37:09 +00002110 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002111 self.running = True
2112 if not self.server.starttls_server:
2113 if not self.wrap_conn():
2114 return
2115 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002116 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002117 msg = self.read()
2118 stripped = msg.strip()
2119 if not stripped:
2120 # eof, so quit this handler
2121 self.running = False
2122 try:
2123 self.sock = self.sslconn.unwrap()
2124 except OSError:
2125 # Many tests shut the TCP connection down
2126 # without an SSL shutdown. This causes
2127 # unwrap() to raise OSError with errno=0!
2128 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002129 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002130 self.sslconn = None
2131 self.close()
2132 elif stripped == b'over':
2133 if support.verbose and self.server.connectionchatty:
2134 sys.stdout.write(" server: client closed connection\n")
2135 self.close()
2136 return
2137 elif (self.server.starttls_server and
2138 stripped == b'STARTTLS'):
2139 if support.verbose and self.server.connectionchatty:
2140 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2141 self.write(b"OK\n")
2142 if not self.wrap_conn():
2143 return
2144 elif (self.server.starttls_server and self.sslconn
2145 and stripped == b'ENDTLS'):
2146 if support.verbose and self.server.connectionchatty:
2147 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2148 self.write(b"OK\n")
2149 self.sock = self.sslconn.unwrap()
2150 self.sslconn = None
2151 if support.verbose and self.server.connectionchatty:
2152 sys.stdout.write(" server: connection is now unencrypted...\n")
2153 elif stripped == b'CB tls-unique':
2154 if support.verbose and self.server.connectionchatty:
2155 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2156 data = self.sslconn.get_channel_binding("tls-unique")
2157 self.write(repr(data).encode("us-ascii") + b"\n")
2158 else:
2159 if (support.verbose and
2160 self.server.connectionchatty):
2161 ctype = (self.sslconn and "encrypted") or "unencrypted"
2162 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2163 % (msg, ctype, msg.lower(), ctype))
2164 self.write(msg.lower())
2165 except OSError:
2166 if self.server.chatty:
2167 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002168 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002169 self.running = False
2170 # normally, we'd just stop here, but for the test
2171 # harness, we want to stop the server
2172 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002173
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002174 def __init__(self, certificate=None, ssl_version=None,
2175 certreqs=None, cacerts=None,
2176 chatty=True, connectionchatty=False, starttls_server=False,
2177 npn_protocols=None, alpn_protocols=None,
2178 ciphers=None, context=None):
2179 if context:
2180 self.context = context
2181 else:
2182 self.context = ssl.SSLContext(ssl_version
2183 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002184 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002185 self.context.verify_mode = (certreqs if certreqs is not None
2186 else ssl.CERT_NONE)
2187 if cacerts:
2188 self.context.load_verify_locations(cacerts)
2189 if certificate:
2190 self.context.load_cert_chain(certificate)
2191 if npn_protocols:
2192 self.context.set_npn_protocols(npn_protocols)
2193 if alpn_protocols:
2194 self.context.set_alpn_protocols(alpn_protocols)
2195 if ciphers:
2196 self.context.set_ciphers(ciphers)
2197 self.chatty = chatty
2198 self.connectionchatty = connectionchatty
2199 self.starttls_server = starttls_server
2200 self.sock = socket.socket()
2201 self.port = support.bind_port(self.sock)
2202 self.flag = None
2203 self.active = False
2204 self.selected_npn_protocols = []
2205 self.selected_alpn_protocols = []
2206 self.shared_ciphers = []
2207 self.conn_errors = []
2208 threading.Thread.__init__(self)
2209 self.daemon = True
2210
2211 def __enter__(self):
2212 self.start(threading.Event())
2213 self.flag.wait()
2214 return self
2215
2216 def __exit__(self, *args):
2217 self.stop()
2218 self.join()
2219
2220 def start(self, flag=None):
2221 self.flag = flag
2222 threading.Thread.start(self)
2223
2224 def run(self):
2225 self.sock.settimeout(0.05)
2226 self.sock.listen()
2227 self.active = True
2228 if self.flag:
2229 # signal an event
2230 self.flag.set()
2231 while self.active:
2232 try:
2233 newconn, connaddr = self.sock.accept()
2234 if support.verbose and self.chatty:
2235 sys.stdout.write(' server: new connection from '
2236 + repr(connaddr) + '\n')
2237 handler = self.ConnectionHandler(self, newconn, connaddr)
2238 handler.start()
2239 handler.join()
2240 except socket.timeout:
2241 pass
2242 except KeyboardInterrupt:
2243 self.stop()
2244 self.sock.close()
2245
2246 def stop(self):
2247 self.active = False
2248
2249class AsyncoreEchoServer(threading.Thread):
2250
2251 # this one's based on asyncore.dispatcher
2252
2253 class EchoServer (asyncore.dispatcher):
2254
2255 class ConnectionHandler(asyncore.dispatcher_with_send):
2256
2257 def __init__(self, conn, certfile):
2258 self.socket = test_wrap_socket(conn, server_side=True,
2259 certfile=certfile,
2260 do_handshake_on_connect=False)
2261 asyncore.dispatcher_with_send.__init__(self, self.socket)
2262 self._ssl_accepting = True
2263 self._do_ssl_handshake()
2264
2265 def readable(self):
2266 if isinstance(self.socket, ssl.SSLSocket):
2267 while self.socket.pending() > 0:
2268 self.handle_read_event()
2269 return True
2270
2271 def _do_ssl_handshake(self):
2272 try:
2273 self.socket.do_handshake()
2274 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2275 return
2276 except ssl.SSLEOFError:
2277 return self.handle_close()
2278 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002279 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002280 except OSError as err:
2281 if err.args[0] == errno.ECONNABORTED:
2282 return self.handle_close()
2283 else:
2284 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002285
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002286 def handle_read(self):
2287 if self._ssl_accepting:
2288 self._do_ssl_handshake()
2289 else:
2290 data = self.recv(1024)
2291 if support.verbose:
2292 sys.stdout.write(" server: read %s from client\n" % repr(data))
2293 if not data:
2294 self.close()
2295 else:
2296 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002297
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002298 def handle_close(self):
2299 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002300 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002301 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002302
2303 def handle_error(self):
2304 raise
2305
Trent Nelson78520002008-04-10 20:54:35 +00002306 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002307 self.certfile = certfile
2308 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2309 self.port = support.bind_port(sock, '')
2310 asyncore.dispatcher.__init__(self, sock)
2311 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002312
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002313 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002314 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002315 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2316 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002317
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002318 def handle_error(self):
2319 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002320
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002321 def __init__(self, certfile):
2322 self.flag = None
2323 self.active = False
2324 self.server = self.EchoServer(certfile)
2325 self.port = self.server.port
2326 threading.Thread.__init__(self)
2327 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002328
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002329 def __str__(self):
2330 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002331
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002332 def __enter__(self):
2333 self.start(threading.Event())
2334 self.flag.wait()
2335 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002336
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002337 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002338 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002339 sys.stdout.write(" cleanup: stopping server.\n")
2340 self.stop()
2341 if support.verbose:
2342 sys.stdout.write(" cleanup: joining server thread.\n")
2343 self.join()
2344 if support.verbose:
2345 sys.stdout.write(" cleanup: successfully joined.\n")
2346 # make sure that ConnectionHandler is removed from socket_map
2347 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002348
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002349 def start (self, flag=None):
2350 self.flag = flag
2351 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002352
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002353 def run(self):
2354 self.active = True
2355 if self.flag:
2356 self.flag.set()
2357 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002358 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002359 asyncore.loop(1)
2360 except:
2361 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002362
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002363 def stop(self):
2364 self.active = False
2365 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002366
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002367def server_params_test(client_context, server_context, indata=b"FOO\n",
2368 chatty=True, connectionchatty=False, sni_name=None,
2369 session=None):
2370 """
2371 Launch a server, connect a client to it and try various reads
2372 and writes.
2373 """
2374 stats = {}
2375 server = ThreadedEchoServer(context=server_context,
2376 chatty=chatty,
2377 connectionchatty=False)
2378 with server:
2379 with client_context.wrap_socket(socket.socket(),
2380 server_hostname=sni_name, session=session) as s:
2381 s.connect((HOST, server.port))
2382 for arg in [indata, bytearray(indata), memoryview(indata)]:
2383 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002384 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002385 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002386 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002387 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002388 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002389 if connectionchatty:
2390 if support.verbose:
2391 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002392 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002393 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002394 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2395 % (outdata[:20], len(outdata),
2396 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002397 s.write(b"over\n")
2398 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002399 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002400 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002401 stats.update({
2402 'compression': s.compression(),
2403 'cipher': s.cipher(),
2404 'peercert': s.getpeercert(),
2405 'client_alpn_protocol': s.selected_alpn_protocol(),
2406 'client_npn_protocol': s.selected_npn_protocol(),
2407 'version': s.version(),
2408 'session_reused': s.session_reused,
2409 'session': s.session,
2410 })
2411 s.close()
2412 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2413 stats['server_npn_protocols'] = server.selected_npn_protocols
2414 stats['server_shared_ciphers'] = server.shared_ciphers
2415 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002416
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002417def try_protocol_combo(server_protocol, client_protocol, expect_success,
2418 certsreqs=None, server_options=0, client_options=0):
2419 """
2420 Try to SSL-connect using *client_protocol* to *server_protocol*.
2421 If *expect_success* is true, assert that the connection succeeds,
2422 if it's false, assert that the connection fails.
2423 Also, if *expect_success* is a string, assert that it is the protocol
2424 version actually used by the connection.
2425 """
2426 if certsreqs is None:
2427 certsreqs = ssl.CERT_NONE
2428 certtype = {
2429 ssl.CERT_NONE: "CERT_NONE",
2430 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2431 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2432 }[certsreqs]
2433 if support.verbose:
2434 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2435 sys.stdout.write(formatstr %
2436 (ssl.get_protocol_name(client_protocol),
2437 ssl.get_protocol_name(server_protocol),
2438 certtype))
2439 client_context = ssl.SSLContext(client_protocol)
2440 client_context.options |= client_options
2441 server_context = ssl.SSLContext(server_protocol)
2442 server_context.options |= server_options
2443
2444 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2445 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2446 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002447 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002448 client_context.set_ciphers("ALL")
2449
2450 for ctx in (client_context, server_context):
2451 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002452 ctx.load_cert_chain(SIGNED_CERTFILE)
2453 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002454 try:
2455 stats = server_params_test(client_context, server_context,
2456 chatty=False, connectionchatty=False)
2457 # Protocol mismatch can result in either an SSLError, or a
2458 # "Connection reset by peer" error.
2459 except ssl.SSLError:
2460 if expect_success:
2461 raise
2462 except OSError as e:
2463 if expect_success or e.errno != errno.ECONNRESET:
2464 raise
2465 else:
2466 if not expect_success:
2467 raise AssertionError(
2468 "Client protocol %s succeeded with server protocol %s!"
2469 % (ssl.get_protocol_name(client_protocol),
2470 ssl.get_protocol_name(server_protocol)))
2471 elif (expect_success is not True
2472 and expect_success != stats['version']):
2473 raise AssertionError("version mismatch: expected %r, got %r"
2474 % (expect_success, stats['version']))
2475
2476
2477class ThreadedTests(unittest.TestCase):
2478
2479 @skip_if_broken_ubuntu_ssl
2480 def test_echo(self):
2481 """Basic test of an SSL client connecting to a server"""
2482 if support.verbose:
2483 sys.stdout.write("\n")
2484 for protocol in PROTOCOLS:
2485 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2486 continue
2487 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2488 context = ssl.SSLContext(protocol)
2489 context.load_cert_chain(CERTFILE)
2490 server_params_test(context, context,
2491 chatty=True, connectionchatty=True)
2492
Christian Heimesa170fa12017-09-15 20:27:30 +02002493 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002494
2495 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2496 server_params_test(client_context=client_context,
2497 server_context=server_context,
2498 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002499 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002500
2501 client_context.check_hostname = False
2502 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2503 with self.assertRaises(ssl.SSLError) as e:
2504 server_params_test(client_context=server_context,
2505 server_context=client_context,
2506 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002507 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002508 self.assertIn('called a function you should not call',
2509 str(e.exception))
2510
2511 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2512 with self.assertRaises(ssl.SSLError) as e:
2513 server_params_test(client_context=server_context,
2514 server_context=server_context,
2515 chatty=True, connectionchatty=True)
2516 self.assertIn('called a function you should not call',
2517 str(e.exception))
2518
2519 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2520 with self.assertRaises(ssl.SSLError) as e:
2521 server_params_test(client_context=server_context,
2522 server_context=client_context,
2523 chatty=True, connectionchatty=True)
2524 self.assertIn('called a function you should not call',
2525 str(e.exception))
2526
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 def test_getpeercert(self):
2528 if support.verbose:
2529 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002530
2531 client_context, server_context, hostname = testing_context()
2532 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002533 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002534 with client_context.wrap_socket(socket.socket(),
2535 do_handshake_on_connect=False,
2536 server_hostname=hostname) as s:
2537 s.connect((HOST, server.port))
2538 # getpeercert() raise ValueError while the handshake isn't
2539 # done.
2540 with self.assertRaises(ValueError):
2541 s.getpeercert()
2542 s.do_handshake()
2543 cert = s.getpeercert()
2544 self.assertTrue(cert, "Can't get peer certificate.")
2545 cipher = s.cipher()
2546 if support.verbose:
2547 sys.stdout.write(pprint.pformat(cert) + '\n')
2548 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2549 if 'subject' not in cert:
2550 self.fail("No subject field in certificate: %s." %
2551 pprint.pformat(cert))
2552 if ((('organizationName', 'Python Software Foundation'),)
2553 not in cert['subject']):
2554 self.fail(
2555 "Missing or invalid 'organizationName' field in certificate subject; "
2556 "should be 'Python Software Foundation'.")
2557 self.assertIn('notBefore', cert)
2558 self.assertIn('notAfter', cert)
2559 before = ssl.cert_time_to_seconds(cert['notBefore'])
2560 after = ssl.cert_time_to_seconds(cert['notAfter'])
2561 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002562
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002563 @unittest.skipUnless(have_verify_flags(),
2564 "verify_flags need OpenSSL > 0.9.8")
2565 def test_crl_check(self):
2566 if support.verbose:
2567 sys.stdout.write("\n")
2568
Christian Heimesa170fa12017-09-15 20:27:30 +02002569 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002570
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002571 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002572 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002573
2574 # VERIFY_DEFAULT should pass
2575 server = ThreadedEchoServer(context=server_context, chatty=True)
2576 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002577 with client_context.wrap_socket(socket.socket(),
2578 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002579 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002580 cert = s.getpeercert()
2581 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002582
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002583 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002584 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002585
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 server = ThreadedEchoServer(context=server_context, chatty=True)
2587 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002588 with client_context.wrap_socket(socket.socket(),
2589 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590 with self.assertRaisesRegex(ssl.SSLError,
2591 "certificate verify failed"):
2592 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002593
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002595 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002596
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002597 server = ThreadedEchoServer(context=server_context, chatty=True)
2598 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002599 with client_context.wrap_socket(socket.socket(),
2600 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002601 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602 cert = s.getpeercert()
2603 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002604
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002605 def test_check_hostname(self):
2606 if support.verbose:
2607 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002608
Christian Heimesa170fa12017-09-15 20:27:30 +02002609 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002610
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002611 # correct hostname should verify
2612 server = ThreadedEchoServer(context=server_context, chatty=True)
2613 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002614 with client_context.wrap_socket(socket.socket(),
2615 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002616 s.connect((HOST, server.port))
2617 cert = s.getpeercert()
2618 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002619
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002620 # incorrect hostname should raise an exception
2621 server = ThreadedEchoServer(context=server_context, chatty=True)
2622 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002623 with client_context.wrap_socket(socket.socket(),
2624 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002625 with self.assertRaisesRegex(
2626 ssl.CertificateError,
2627 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002628 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002629
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002630 # missing server_hostname arg should cause an exception, too
2631 server = ThreadedEchoServer(context=server_context, chatty=True)
2632 with server:
2633 with socket.socket() as s:
2634 with self.assertRaisesRegex(ValueError,
2635 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002636 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002637
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002638 def test_ecc_cert(self):
2639 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2640 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002641 client_context.set_ciphers(
2642 'TLS13-AES-128-GCM-SHA256:TLS13-CHACHA20-POLY1305-SHA256:'
2643 'ECDHE:ECDSA:!NULL:!aRSA'
2644 )
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002645 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2646
2647 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2648 # load ECC cert
2649 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2650
2651 # correct hostname should verify
2652 server = ThreadedEchoServer(context=server_context, chatty=True)
2653 with server:
2654 with client_context.wrap_socket(socket.socket(),
2655 server_hostname=hostname) as s:
2656 s.connect((HOST, server.port))
2657 cert = s.getpeercert()
2658 self.assertTrue(cert, "Can't get peer certificate.")
2659 cipher = s.cipher()[0].split('-')
2660 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2661
2662 def test_dual_rsa_ecc(self):
2663 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2664 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002665 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2666 # algorithms.
2667 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002668 # only ECDSA certs
2669 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2670 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2671
2672 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2673 # load ECC and RSA key/cert pairs
2674 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2675 server_context.load_cert_chain(SIGNED_CERTFILE)
2676
2677 # correct hostname should verify
2678 server = ThreadedEchoServer(context=server_context, chatty=True)
2679 with server:
2680 with client_context.wrap_socket(socket.socket(),
2681 server_hostname=hostname) as s:
2682 s.connect((HOST, server.port))
2683 cert = s.getpeercert()
2684 self.assertTrue(cert, "Can't get peer certificate.")
2685 cipher = s.cipher()[0].split('-')
2686 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2687
Christian Heimes66e57422018-01-29 14:25:13 +01002688 def test_check_hostname_idn(self):
2689 if support.verbose:
2690 sys.stdout.write("\n")
2691
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002692 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002693 server_context.load_cert_chain(IDNSANSFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002694 # TODO: fix TLSv1.3 support
2695 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimes66e57422018-01-29 14:25:13 +01002696
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002697 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002698 context.verify_mode = ssl.CERT_REQUIRED
2699 context.check_hostname = True
2700 context.load_verify_locations(SIGNING_CA)
2701
2702 # correct hostname should verify, when specified in several
2703 # different ways
2704 idn_hostnames = [
2705 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002706 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002707 ('xn--knig-5qa.idn.pythontest.net',
2708 'xn--knig-5qa.idn.pythontest.net'),
2709 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002710 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002711
2712 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002713 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002714 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2715 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2716 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002717 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2718
2719 # ('königsgäßchen.idna2008.pythontest.net',
2720 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2721 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2722 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2723 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2724 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2725
Christian Heimes66e57422018-01-29 14:25:13 +01002726 ]
2727 for server_hostname, expected_hostname in idn_hostnames:
2728 server = ThreadedEchoServer(context=server_context, chatty=True)
2729 with server:
2730 with context.wrap_socket(socket.socket(),
2731 server_hostname=server_hostname) as s:
2732 self.assertEqual(s.server_hostname, expected_hostname)
2733 s.connect((HOST, server.port))
2734 cert = s.getpeercert()
2735 self.assertEqual(s.server_hostname, expected_hostname)
2736 self.assertTrue(cert, "Can't get peer certificate.")
2737
2738 with ssl.SSLSocket(socket.socket(),
2739 server_hostname=server_hostname) as s:
2740 s.connect((HOST, server.port))
2741 s.getpeercert()
2742 self.assertEqual(s.server_hostname, expected_hostname)
2743
Christian Heimes66e57422018-01-29 14:25:13 +01002744 # incorrect hostname should raise an exception
2745 server = ThreadedEchoServer(context=server_context, chatty=True)
2746 with server:
2747 with context.wrap_socket(socket.socket(),
2748 server_hostname="python.example.org") as s:
2749 with self.assertRaises(ssl.CertificateError):
2750 s.connect((HOST, server.port))
2751
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002752 def test_wrong_cert(self):
2753 """Connecting when the server rejects the client's certificate
2754
2755 Launch a server with CERT_REQUIRED, and check that trying to
2756 connect to it with a wrong client certificate fails.
2757 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002758 client_context, server_context, hostname = testing_context()
2759 # load client cert
2760 client_context.load_cert_chain(WRONG_CERT)
2761 # require TLS client authentication
2762 server_context.verify_mode = ssl.CERT_REQUIRED
2763 # TODO: fix TLSv1.3 support
2764 # With TLS 1.3, test fails with exception in server thread
2765 server_context.options |= ssl.OP_NO_TLSv1_3
2766
2767 server = ThreadedEchoServer(
2768 context=server_context, chatty=True, connectionchatty=True,
2769 )
2770
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002771 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002772 client_context.wrap_socket(socket.socket(),
2773 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002774 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002775 # Expect either an SSL error about the server rejecting
2776 # the connection, or a low-level connection reset (which
2777 # sometimes happens on Windows)
2778 s.connect((HOST, server.port))
2779 except ssl.SSLError as e:
2780 if support.verbose:
2781 sys.stdout.write("\nSSLError is %r\n" % e)
2782 except OSError as e:
2783 if e.errno != errno.ECONNRESET:
2784 raise
2785 if support.verbose:
2786 sys.stdout.write("\nsocket.error is %r\n" % e)
2787 else:
2788 self.fail("Use of invalid cert should have failed!")
2789
2790 def test_rude_shutdown(self):
2791 """A brutal shutdown of an SSL server should raise an OSError
2792 in the client when attempting handshake.
2793 """
2794 listener_ready = threading.Event()
2795 listener_gone = threading.Event()
2796
2797 s = socket.socket()
2798 port = support.bind_port(s, HOST)
2799
2800 # `listener` runs in a thread. It sits in an accept() until
2801 # the main thread connects. Then it rudely closes the socket,
2802 # and sets Event `listener_gone` to let the main thread know
2803 # the socket is gone.
2804 def listener():
2805 s.listen()
2806 listener_ready.set()
2807 newsock, addr = s.accept()
2808 newsock.close()
2809 s.close()
2810 listener_gone.set()
2811
2812 def connector():
2813 listener_ready.wait()
2814 with socket.socket() as c:
2815 c.connect((HOST, port))
2816 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002817 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002818 ssl_sock = test_wrap_socket(c)
2819 except OSError:
2820 pass
2821 else:
2822 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002823
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002824 t = threading.Thread(target=listener)
2825 t.start()
2826 try:
2827 connector()
2828 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002829 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002830
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002831 def test_ssl_cert_verify_error(self):
2832 if support.verbose:
2833 sys.stdout.write("\n")
2834
2835 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2836 server_context.load_cert_chain(SIGNED_CERTFILE)
2837
2838 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2839
2840 server = ThreadedEchoServer(context=server_context, chatty=True)
2841 with server:
2842 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002843 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002844 try:
2845 s.connect((HOST, server.port))
2846 except ssl.SSLError as e:
2847 msg = 'unable to get local issuer certificate'
2848 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2849 self.assertEqual(e.verify_code, 20)
2850 self.assertEqual(e.verify_message, msg)
2851 self.assertIn(msg, repr(e))
2852 self.assertIn('certificate verify failed', repr(e))
2853
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002854 @skip_if_broken_ubuntu_ssl
2855 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2856 "OpenSSL is compiled without SSLv2 support")
2857 def test_protocol_sslv2(self):
2858 """Connecting to an SSLv2 server with various client options"""
2859 if support.verbose:
2860 sys.stdout.write("\n")
2861 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2862 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2863 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002864 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002865 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2866 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2867 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2868 # SSLv23 client with specific SSL options
2869 if no_sslv2_implies_sslv3_hello():
2870 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002871 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002872 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002873 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002874 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002875 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002876 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002877
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002878 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002879 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002880 """Connecting to an SSLv23 server with various client options"""
2881 if support.verbose:
2882 sys.stdout.write("\n")
2883 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002884 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002885 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002886 except OSError as x:
2887 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2888 if support.verbose:
2889 sys.stdout.write(
2890 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2891 % str(x))
2892 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002893 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2894 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2895 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002896
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002897 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002898 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2899 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2900 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002901
2902 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002903 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2904 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2905 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002906
2907 # Server with specific SSL options
2908 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002909 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002910 server_options=ssl.OP_NO_SSLv3)
2911 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002912 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002913 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002914 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002915 server_options=ssl.OP_NO_TLSv1)
2916
2917
2918 @skip_if_broken_ubuntu_ssl
2919 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2920 "OpenSSL is compiled without SSLv3 support")
2921 def test_protocol_sslv3(self):
2922 """Connecting to an SSLv3 server with various client options"""
2923 if support.verbose:
2924 sys.stdout.write("\n")
2925 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2926 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2927 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2928 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2929 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002930 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002931 client_options=ssl.OP_NO_SSLv3)
2932 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2933 if no_sslv2_implies_sslv3_hello():
2934 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002935 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002936 False, client_options=ssl.OP_NO_SSLv2)
2937
2938 @skip_if_broken_ubuntu_ssl
2939 def test_protocol_tlsv1(self):
2940 """Connecting to a TLSv1 server with various client options"""
2941 if support.verbose:
2942 sys.stdout.write("\n")
2943 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2944 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2945 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2946 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2947 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2948 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2949 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002950 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002951 client_options=ssl.OP_NO_TLSv1)
2952
2953 @skip_if_broken_ubuntu_ssl
2954 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2955 "TLS version 1.1 not supported.")
2956 def test_protocol_tlsv1_1(self):
2957 """Connecting to a TLSv1.1 server with various client options.
2958 Testing against older TLS versions."""
2959 if support.verbose:
2960 sys.stdout.write("\n")
2961 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2962 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2963 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2964 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2965 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002966 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002967 client_options=ssl.OP_NO_TLSv1_1)
2968
Christian Heimesa170fa12017-09-15 20:27:30 +02002969 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002970 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2971 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2972
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 @skip_if_broken_ubuntu_ssl
2974 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2975 "TLS version 1.2 not supported.")
2976 def test_protocol_tlsv1_2(self):
2977 """Connecting to a TLSv1.2 server with various client options.
2978 Testing against older TLS versions."""
2979 if support.verbose:
2980 sys.stdout.write("\n")
2981 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2982 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2983 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2984 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2985 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2986 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2987 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002988 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002989 client_options=ssl.OP_NO_TLSv1_2)
2990
Christian Heimesa170fa12017-09-15 20:27:30 +02002991 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002992 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2993 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2994 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2995 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2996
2997 def test_starttls(self):
2998 """Switching from clear text to encrypted and back again."""
2999 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3000
3001 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003002 starttls_server=True,
3003 chatty=True,
3004 connectionchatty=True)
3005 wrapped = False
3006 with server:
3007 s = socket.socket()
3008 s.setblocking(1)
3009 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003010 if support.verbose:
3011 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003012 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003013 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003014 sys.stdout.write(
3015 " client: sending %r...\n" % indata)
3016 if wrapped:
3017 conn.write(indata)
3018 outdata = conn.read()
3019 else:
3020 s.send(indata)
3021 outdata = s.recv(1024)
3022 msg = outdata.strip().lower()
3023 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3024 # STARTTLS ok, switch to secure mode
3025 if support.verbose:
3026 sys.stdout.write(
3027 " client: read %r from server, starting TLS...\n"
3028 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003029 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003030 wrapped = True
3031 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3032 # ENDTLS ok, switch back to clear text
3033 if support.verbose:
3034 sys.stdout.write(
3035 " client: read %r from server, ending TLS...\n"
3036 % msg)
3037 s = conn.unwrap()
3038 wrapped = False
3039 else:
3040 if support.verbose:
3041 sys.stdout.write(
3042 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003043 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003044 sys.stdout.write(" client: closing connection.\n")
3045 if wrapped:
3046 conn.write(b"over\n")
3047 else:
3048 s.send(b"over\n")
3049 if wrapped:
3050 conn.close()
3051 else:
3052 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003053
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003054 def test_socketserver(self):
3055 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003056 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003057 # try to connect
3058 if support.verbose:
3059 sys.stdout.write('\n')
3060 with open(CERTFILE, 'rb') as f:
3061 d1 = f.read()
3062 d2 = ''
3063 # now fetch the same data from the HTTPS server
3064 url = 'https://localhost:%d/%s' % (
3065 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003066 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003067 f = urllib.request.urlopen(url, context=context)
3068 try:
3069 dlen = f.info().get("content-length")
3070 if dlen and (int(dlen) > 0):
3071 d2 = f.read(int(dlen))
3072 if support.verbose:
3073 sys.stdout.write(
3074 " client: read %d bytes from remote server '%s'\n"
3075 % (len(d2), server))
3076 finally:
3077 f.close()
3078 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003079
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003080 def test_asyncore_server(self):
3081 """Check the example asyncore integration."""
3082 if support.verbose:
3083 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003084
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085 indata = b"FOO\n"
3086 server = AsyncoreEchoServer(CERTFILE)
3087 with server:
3088 s = test_wrap_socket(socket.socket())
3089 s.connect(('127.0.0.1', server.port))
3090 if support.verbose:
3091 sys.stdout.write(
3092 " client: sending %r...\n" % indata)
3093 s.write(indata)
3094 outdata = s.read()
3095 if support.verbose:
3096 sys.stdout.write(" client: read %r\n" % outdata)
3097 if outdata != indata.lower():
3098 self.fail(
3099 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3100 % (outdata[:20], len(outdata),
3101 indata[:20].lower(), len(indata)))
3102 s.write(b"over\n")
3103 if support.verbose:
3104 sys.stdout.write(" client: closing connection.\n")
3105 s.close()
3106 if support.verbose:
3107 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003108
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003109 def test_recv_send(self):
3110 """Test recv(), send() and friends."""
3111 if support.verbose:
3112 sys.stdout.write("\n")
3113
3114 server = ThreadedEchoServer(CERTFILE,
3115 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003116 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003117 cacerts=CERTFILE,
3118 chatty=True,
3119 connectionchatty=False)
3120 with server:
3121 s = test_wrap_socket(socket.socket(),
3122 server_side=False,
3123 certfile=CERTFILE,
3124 ca_certs=CERTFILE,
3125 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003126 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003127 s.connect((HOST, server.port))
3128 # helper methods for standardising recv* method signatures
3129 def _recv_into():
3130 b = bytearray(b"\0"*100)
3131 count = s.recv_into(b)
3132 return b[:count]
3133
3134 def _recvfrom_into():
3135 b = bytearray(b"\0"*100)
3136 count, addr = s.recvfrom_into(b)
3137 return b[:count]
3138
3139 # (name, method, expect success?, *args, return value func)
3140 send_methods = [
3141 ('send', s.send, True, [], len),
3142 ('sendto', s.sendto, False, ["some.address"], len),
3143 ('sendall', s.sendall, True, [], lambda x: None),
3144 ]
3145 # (name, method, whether to expect success, *args)
3146 recv_methods = [
3147 ('recv', s.recv, True, []),
3148 ('recvfrom', s.recvfrom, False, ["some.address"]),
3149 ('recv_into', _recv_into, True, []),
3150 ('recvfrom_into', _recvfrom_into, False, []),
3151 ]
3152 data_prefix = "PREFIX_"
3153
3154 for (meth_name, send_meth, expect_success, args,
3155 ret_val_meth) in send_methods:
3156 indata = (data_prefix + meth_name).encode('ascii')
3157 try:
3158 ret = send_meth(indata, *args)
3159 msg = "sending with {}".format(meth_name)
3160 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3161 outdata = s.read()
3162 if outdata != indata.lower():
3163 self.fail(
3164 "While sending with <<{name:s}>> bad data "
3165 "<<{outdata:r}>> ({nout:d}) received; "
3166 "expected <<{indata:r}>> ({nin:d})\n".format(
3167 name=meth_name, outdata=outdata[:20],
3168 nout=len(outdata),
3169 indata=indata[:20], nin=len(indata)
3170 )
3171 )
3172 except ValueError as e:
3173 if expect_success:
3174 self.fail(
3175 "Failed to send with method <<{name:s}>>; "
3176 "expected to succeed.\n".format(name=meth_name)
3177 )
3178 if not str(e).startswith(meth_name):
3179 self.fail(
3180 "Method <<{name:s}>> failed with unexpected "
3181 "exception message: {exp:s}\n".format(
3182 name=meth_name, exp=e
3183 )
3184 )
3185
3186 for meth_name, recv_meth, expect_success, args in recv_methods:
3187 indata = (data_prefix + meth_name).encode('ascii')
3188 try:
3189 s.send(indata)
3190 outdata = recv_meth(*args)
3191 if outdata != indata.lower():
3192 self.fail(
3193 "While receiving with <<{name:s}>> bad data "
3194 "<<{outdata:r}>> ({nout:d}) received; "
3195 "expected <<{indata:r}>> ({nin:d})\n".format(
3196 name=meth_name, outdata=outdata[:20],
3197 nout=len(outdata),
3198 indata=indata[:20], nin=len(indata)
3199 )
3200 )
3201 except ValueError as e:
3202 if expect_success:
3203 self.fail(
3204 "Failed to receive with method <<{name:s}>>; "
3205 "expected to succeed.\n".format(name=meth_name)
3206 )
3207 if not str(e).startswith(meth_name):
3208 self.fail(
3209 "Method <<{name:s}>> failed with unexpected "
3210 "exception message: {exp:s}\n".format(
3211 name=meth_name, exp=e
3212 )
3213 )
3214 # consume data
3215 s.read()
3216
3217 # read(-1, buffer) is supported, even though read(-1) is not
3218 data = b"data"
3219 s.send(data)
3220 buffer = bytearray(len(data))
3221 self.assertEqual(s.read(-1, buffer), len(data))
3222 self.assertEqual(buffer, data)
3223
Christian Heimes888bbdc2017-09-07 14:18:21 -07003224 # sendall accepts bytes-like objects
3225 if ctypes is not None:
3226 ubyte = ctypes.c_ubyte * len(data)
3227 byteslike = ubyte.from_buffer_copy(data)
3228 s.sendall(byteslike)
3229 self.assertEqual(s.read(), data)
3230
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003231 # Make sure sendmsg et al are disallowed to avoid
3232 # inadvertent disclosure of data and/or corruption
3233 # of the encrypted data stream
3234 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3235 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3236 self.assertRaises(NotImplementedError,
3237 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003238 s.write(b"over\n")
3239
3240 self.assertRaises(ValueError, s.recv, -1)
3241 self.assertRaises(ValueError, s.read, -1)
3242
3243 s.close()
3244
3245 def test_recv_zero(self):
3246 server = ThreadedEchoServer(CERTFILE)
3247 server.__enter__()
3248 self.addCleanup(server.__exit__, None, None)
3249 s = socket.create_connection((HOST, server.port))
3250 self.addCleanup(s.close)
3251 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3252 self.addCleanup(s.close)
3253
3254 # recv/read(0) should return no data
3255 s.send(b"data")
3256 self.assertEqual(s.recv(0), b"")
3257 self.assertEqual(s.read(0), b"")
3258 self.assertEqual(s.read(), b"data")
3259
3260 # Should not block if the other end sends no data
3261 s.setblocking(False)
3262 self.assertEqual(s.recv(0), b"")
3263 self.assertEqual(s.recv_into(bytearray()), 0)
3264
3265 def test_nonblocking_send(self):
3266 server = ThreadedEchoServer(CERTFILE,
3267 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003268 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003269 cacerts=CERTFILE,
3270 chatty=True,
3271 connectionchatty=False)
3272 with server:
3273 s = test_wrap_socket(socket.socket(),
3274 server_side=False,
3275 certfile=CERTFILE,
3276 ca_certs=CERTFILE,
3277 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003278 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003279 s.connect((HOST, server.port))
3280 s.setblocking(False)
3281
3282 # If we keep sending data, at some point the buffers
3283 # will be full and the call will block
3284 buf = bytearray(8192)
3285 def fill_buffer():
3286 while True:
3287 s.send(buf)
3288 self.assertRaises((ssl.SSLWantWriteError,
3289 ssl.SSLWantReadError), fill_buffer)
3290
3291 # Now read all the output and discard it
3292 s.setblocking(True)
3293 s.close()
3294
3295 def test_handshake_timeout(self):
3296 # Issue #5103: SSL handshake must respect the socket timeout
3297 server = socket.socket(socket.AF_INET)
3298 host = "127.0.0.1"
3299 port = support.bind_port(server)
3300 started = threading.Event()
3301 finish = False
3302
3303 def serve():
3304 server.listen()
3305 started.set()
3306 conns = []
3307 while not finish:
3308 r, w, e = select.select([server], [], [], 0.1)
3309 if server in r:
3310 # Let the socket hang around rather than having
3311 # it closed by garbage collection.
3312 conns.append(server.accept()[0])
3313 for sock in conns:
3314 sock.close()
3315
3316 t = threading.Thread(target=serve)
3317 t.start()
3318 started.wait()
3319
3320 try:
3321 try:
3322 c = socket.socket(socket.AF_INET)
3323 c.settimeout(0.2)
3324 c.connect((host, port))
3325 # Will attempt handshake and time out
3326 self.assertRaisesRegex(socket.timeout, "timed out",
3327 test_wrap_socket, c)
3328 finally:
3329 c.close()
3330 try:
3331 c = socket.socket(socket.AF_INET)
3332 c = test_wrap_socket(c)
3333 c.settimeout(0.2)
3334 # Will attempt handshake and time out
3335 self.assertRaisesRegex(socket.timeout, "timed out",
3336 c.connect, (host, port))
3337 finally:
3338 c.close()
3339 finally:
3340 finish = True
3341 t.join()
3342 server.close()
3343
3344 def test_server_accept(self):
3345 # Issue #16357: accept() on a SSLSocket created through
3346 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003347 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003348 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003349 context.load_verify_locations(SIGNING_CA)
3350 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003351 server = socket.socket(socket.AF_INET)
3352 host = "127.0.0.1"
3353 port = support.bind_port(server)
3354 server = context.wrap_socket(server, server_side=True)
3355 self.assertTrue(server.server_side)
3356
3357 evt = threading.Event()
3358 remote = None
3359 peer = None
3360 def serve():
3361 nonlocal remote, peer
3362 server.listen()
3363 # Block on the accept and wait on the connection to close.
3364 evt.set()
3365 remote, peer = server.accept()
3366 remote.recv(1)
3367
3368 t = threading.Thread(target=serve)
3369 t.start()
3370 # Client wait until server setup and perform a connect.
3371 evt.wait()
3372 client = context.wrap_socket(socket.socket())
3373 client.connect((host, port))
3374 client_addr = client.getsockname()
3375 client.close()
3376 t.join()
3377 remote.close()
3378 server.close()
3379 # Sanity checks.
3380 self.assertIsInstance(remote, ssl.SSLSocket)
3381 self.assertEqual(peer, client_addr)
3382
3383 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003384 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003385 with context.wrap_socket(socket.socket()) as sock:
3386 with self.assertRaises(OSError) as cm:
3387 sock.getpeercert()
3388 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3389
3390 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003391 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003392 with context.wrap_socket(socket.socket()) as sock:
3393 with self.assertRaises(OSError) as cm:
3394 sock.do_handshake()
3395 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3396
3397 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003398 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003399 try:
3400 # Force a set of weak ciphers on our client context
3401 context.set_ciphers("DES")
3402 except ssl.SSLError:
3403 self.skipTest("no DES cipher available")
3404 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003405 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003406 chatty=False) as server:
3407 with context.wrap_socket(socket.socket()) as s:
3408 with self.assertRaises(OSError):
3409 s.connect((HOST, server.port))
3410 self.assertIn("no shared cipher", server.conn_errors[0])
3411
3412 def test_version_basic(self):
3413 """
3414 Basic tests for SSLSocket.version().
3415 More tests are done in the test_protocol_*() methods.
3416 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003417 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3418 context.check_hostname = False
3419 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003420 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003421 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003422 chatty=False) as server:
3423 with context.wrap_socket(socket.socket()) as s:
3424 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003425 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003426 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003427 if ssl.OPENSSL_VERSION_INFO >= (1, 1, 1):
3428 self.assertEqual(s.version(), 'TLSv1.3')
3429 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003430 self.assertEqual(s.version(), 'TLSv1.2')
3431 else: # 0.9.8 to 1.0.1
3432 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003433 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003434 self.assertIs(s.version(), None)
3435
Christian Heimescb5b68a2017-09-07 18:07:00 -07003436 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3437 "test requires TLSv1.3 enabled OpenSSL")
3438 def test_tls1_3(self):
3439 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3440 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003441 context.options |= (
3442 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3443 )
3444 with ThreadedEchoServer(context=context) as server:
3445 with context.wrap_socket(socket.socket()) as s:
3446 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003447 self.assertIn(s.cipher()[0], {
Christian Heimescb5b68a2017-09-07 18:07:00 -07003448 'TLS13-AES-256-GCM-SHA384',
3449 'TLS13-CHACHA20-POLY1305-SHA256',
3450 'TLS13-AES-128-GCM-SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003451 })
3452 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003453
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003454 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3455 def test_default_ecdh_curve(self):
3456 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3457 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003458 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003459 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003460 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3461 # cipher name.
3462 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003463 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3464 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3465 # our default cipher list should prefer ECDH-based ciphers
3466 # automatically.
3467 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3468 context.set_ciphers("ECCdraft:ECDH")
3469 with ThreadedEchoServer(context=context) as server:
3470 with context.wrap_socket(socket.socket()) as s:
3471 s.connect((HOST, server.port))
3472 self.assertIn("ECDH", s.cipher()[0])
3473
3474 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3475 "'tls-unique' channel binding not available")
3476 def test_tls_unique_channel_binding(self):
3477 """Test tls-unique channel binding."""
3478 if support.verbose:
3479 sys.stdout.write("\n")
3480
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003481 client_context, server_context, hostname = testing_context()
3482 # TODO: fix TLSv1.3 support
3483 client_context.options |= ssl.OP_NO_TLSv1_3
3484
3485 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003486 chatty=True,
3487 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003488
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003489 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003490 with client_context.wrap_socket(
3491 socket.socket(),
3492 server_hostname=hostname) as s:
3493 s.connect((HOST, server.port))
3494 # get the data
3495 cb_data = s.get_channel_binding("tls-unique")
3496 if support.verbose:
3497 sys.stdout.write(
3498 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003499
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003500 # check if it is sane
3501 self.assertIsNotNone(cb_data)
3502 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003503
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003504 # and compare with the peers version
3505 s.write(b"CB tls-unique\n")
3506 peer_data_repr = s.read().strip()
3507 self.assertEqual(peer_data_repr,
3508 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003509
3510 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003511 with client_context.wrap_socket(
3512 socket.socket(),
3513 server_hostname=hostname) as s:
3514 s.connect((HOST, server.port))
3515 new_cb_data = s.get_channel_binding("tls-unique")
3516 if support.verbose:
3517 sys.stdout.write(
3518 "got another channel binding data: {0!r}\n".format(
3519 new_cb_data)
3520 )
3521 # is it really unique
3522 self.assertNotEqual(cb_data, new_cb_data)
3523 self.assertIsNotNone(cb_data)
3524 self.assertEqual(len(cb_data), 12) # True for TLSv1
3525 s.write(b"CB tls-unique\n")
3526 peer_data_repr = s.read().strip()
3527 self.assertEqual(peer_data_repr,
3528 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003529
3530 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003531 client_context, server_context, hostname = testing_context()
3532 stats = server_params_test(client_context, server_context,
3533 chatty=True, connectionchatty=True,
3534 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003535 if support.verbose:
3536 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3537 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3538
3539 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3540 "ssl.OP_NO_COMPRESSION needed for this test")
3541 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003542 client_context, server_context, hostname = testing_context()
3543 client_context.options |= ssl.OP_NO_COMPRESSION
3544 server_context.options |= ssl.OP_NO_COMPRESSION
3545 stats = server_params_test(client_context, server_context,
3546 chatty=True, connectionchatty=True,
3547 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003548 self.assertIs(stats['compression'], None)
3549
3550 def test_dh_params(self):
3551 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003552 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003553 # test scenario needs TLS <= 1.2
3554 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003555 server_context.load_dh_params(DHFILE)
3556 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003557 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003558 stats = server_params_test(client_context, server_context,
3559 chatty=True, connectionchatty=True,
3560 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003561 cipher = stats["cipher"][0]
3562 parts = cipher.split("-")
3563 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3564 self.fail("Non-DH cipher: " + cipher[0])
3565
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003566 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003567 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003568 def test_ecdh_curve(self):
3569 # server secp384r1, client auto
3570 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003571
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003572 server_context.set_ecdh_curve("secp384r1")
3573 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3574 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3575 stats = server_params_test(client_context, server_context,
3576 chatty=True, connectionchatty=True,
3577 sni_name=hostname)
3578
3579 # server auto, client secp384r1
3580 client_context, server_context, hostname = testing_context()
3581 client_context.set_ecdh_curve("secp384r1")
3582 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3583 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3584 stats = server_params_test(client_context, server_context,
3585 chatty=True, connectionchatty=True,
3586 sni_name=hostname)
3587
3588 # server / client curve mismatch
3589 client_context, server_context, hostname = testing_context()
3590 client_context.set_ecdh_curve("prime256v1")
3591 server_context.set_ecdh_curve("secp384r1")
3592 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3593 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3594 try:
3595 stats = server_params_test(client_context, server_context,
3596 chatty=True, connectionchatty=True,
3597 sni_name=hostname)
3598 except ssl.SSLError:
3599 pass
3600 else:
3601 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003602 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003603 self.fail("mismatch curve did not fail")
3604
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003605 def test_selected_alpn_protocol(self):
3606 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003607 client_context, server_context, hostname = testing_context()
3608 stats = server_params_test(client_context, server_context,
3609 chatty=True, connectionchatty=True,
3610 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003611 self.assertIs(stats['client_alpn_protocol'], None)
3612
3613 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3614 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3615 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003616 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003617 server_context.set_alpn_protocols(['foo', 'bar'])
3618 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003619 chatty=True, connectionchatty=True,
3620 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003621 self.assertIs(stats['client_alpn_protocol'], None)
3622
3623 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3624 def test_alpn_protocols(self):
3625 server_protocols = ['foo', 'bar', 'milkshake']
3626 protocol_tests = [
3627 (['foo', 'bar'], 'foo'),
3628 (['bar', 'foo'], 'foo'),
3629 (['milkshake'], 'milkshake'),
3630 (['http/3.0', 'http/4.0'], None)
3631 ]
3632 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003633 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003634 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003635 client_context.set_alpn_protocols(client_protocols)
3636
3637 try:
3638 stats = server_params_test(client_context,
3639 server_context,
3640 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003641 connectionchatty=True,
3642 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003643 except ssl.SSLError as e:
3644 stats = e
3645
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003646 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003647 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3648 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3649 self.assertIsInstance(stats, ssl.SSLError)
3650 else:
3651 msg = "failed trying %s (s) and %s (c).\n" \
3652 "was expecting %s, but got %%s from the %%s" \
3653 % (str(server_protocols), str(client_protocols),
3654 str(expected))
3655 client_result = stats['client_alpn_protocol']
3656 self.assertEqual(client_result, expected,
3657 msg % (client_result, "client"))
3658 server_result = stats['server_alpn_protocols'][-1] \
3659 if len(stats['server_alpn_protocols']) else 'nothing'
3660 self.assertEqual(server_result, expected,
3661 msg % (server_result, "server"))
3662
3663 def test_selected_npn_protocol(self):
3664 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003665 client_context, server_context, hostname = testing_context()
3666 stats = server_params_test(client_context, server_context,
3667 chatty=True, connectionchatty=True,
3668 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003669 self.assertIs(stats['client_npn_protocol'], None)
3670
3671 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3672 def test_npn_protocols(self):
3673 server_protocols = ['http/1.1', 'spdy/2']
3674 protocol_tests = [
3675 (['http/1.1', 'spdy/2'], 'http/1.1'),
3676 (['spdy/2', 'http/1.1'], 'http/1.1'),
3677 (['spdy/2', 'test'], 'spdy/2'),
3678 (['abc', 'def'], 'abc')
3679 ]
3680 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003681 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003682 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003683 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003684 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003685 chatty=True, connectionchatty=True,
3686 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003687 msg = "failed trying %s (s) and %s (c).\n" \
3688 "was expecting %s, but got %%s from the %%s" \
3689 % (str(server_protocols), str(client_protocols),
3690 str(expected))
3691 client_result = stats['client_npn_protocol']
3692 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3693 server_result = stats['server_npn_protocols'][-1] \
3694 if len(stats['server_npn_protocols']) else 'nothing'
3695 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3696
3697 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003698 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003699 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003700 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003701 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003702 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003703 client_context.load_verify_locations(SIGNING_CA)
3704 return server_context, other_context, client_context
3705
3706 def check_common_name(self, stats, name):
3707 cert = stats['peercert']
3708 self.assertIn((('commonName', name),), cert['subject'])
3709
3710 @needs_sni
3711 def test_sni_callback(self):
3712 calls = []
3713 server_context, other_context, client_context = self.sni_contexts()
3714
Christian Heimesa170fa12017-09-15 20:27:30 +02003715 client_context.check_hostname = False
3716
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003717 def servername_cb(ssl_sock, server_name, initial_context):
3718 calls.append((server_name, initial_context))
3719 if server_name is not None:
3720 ssl_sock.context = other_context
3721 server_context.set_servername_callback(servername_cb)
3722
3723 stats = server_params_test(client_context, server_context,
3724 chatty=True,
3725 sni_name='supermessage')
3726 # The hostname was fetched properly, and the certificate was
3727 # changed for the connection.
3728 self.assertEqual(calls, [("supermessage", server_context)])
3729 # CERTFILE4 was selected
3730 self.check_common_name(stats, 'fakehostname')
3731
3732 calls = []
3733 # The callback is called with server_name=None
3734 stats = server_params_test(client_context, server_context,
3735 chatty=True,
3736 sni_name=None)
3737 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003738 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003739
3740 # Check disabling the callback
3741 calls = []
3742 server_context.set_servername_callback(None)
3743
3744 stats = server_params_test(client_context, server_context,
3745 chatty=True,
3746 sni_name='notfunny')
3747 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003748 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003749 self.assertEqual(calls, [])
3750
3751 @needs_sni
3752 def test_sni_callback_alert(self):
3753 # Returning a TLS alert is reflected to the connecting client
3754 server_context, other_context, client_context = self.sni_contexts()
3755
3756 def cb_returning_alert(ssl_sock, server_name, initial_context):
3757 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3758 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003759 with self.assertRaises(ssl.SSLError) as cm:
3760 stats = server_params_test(client_context, server_context,
3761 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003762 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003763 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003764
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003765 @needs_sni
3766 def test_sni_callback_raising(self):
3767 # Raising fails the connection with a TLS handshake failure alert.
3768 server_context, other_context, client_context = self.sni_contexts()
3769
3770 def cb_raising(ssl_sock, server_name, initial_context):
3771 1/0
3772 server_context.set_servername_callback(cb_raising)
3773
3774 with self.assertRaises(ssl.SSLError) as cm, \
3775 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003776 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003777 chatty=False,
3778 sni_name='supermessage')
3779 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3780 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003781
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003782 @needs_sni
3783 def test_sni_callback_wrong_return_type(self):
3784 # Returning the wrong return type terminates the TLS connection
3785 # with an internal error alert.
3786 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003787
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003788 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3789 return "foo"
3790 server_context.set_servername_callback(cb_wrong_return_type)
3791
3792 with self.assertRaises(ssl.SSLError) as cm, \
3793 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003794 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003795 chatty=False,
3796 sni_name='supermessage')
3797 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3798 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003799
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003800 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003801 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003802 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3803 client_context.set_ciphers("AES128:AES256")
3804 server_context.set_ciphers("AES256")
3805 alg1 = "AES256"
3806 alg2 = "AES-256"
3807 else:
3808 client_context.set_ciphers("AES:3DES")
3809 server_context.set_ciphers("3DES")
3810 alg1 = "3DES"
3811 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003812
Christian Heimesa170fa12017-09-15 20:27:30 +02003813 stats = server_params_test(client_context, server_context,
3814 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003815 ciphers = stats['server_shared_ciphers'][0]
3816 self.assertGreater(len(ciphers), 0)
3817 for name, tls_version, bits in ciphers:
3818 if not alg1 in name.split("-") and alg2 not in name:
3819 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003820
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003821 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003822 client_context, server_context, hostname = testing_context()
3823 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003824
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003825 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003826 s = client_context.wrap_socket(socket.socket(),
3827 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003828 s.connect((HOST, server.port))
3829 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003830
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003831 self.assertRaises(ValueError, s.read, 1024)
3832 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003833
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003834 def test_sendfile(self):
3835 TEST_DATA = b"x" * 512
3836 with open(support.TESTFN, 'wb') as f:
3837 f.write(TEST_DATA)
3838 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003839 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003840 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003841 context.load_verify_locations(SIGNING_CA)
3842 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003843 server = ThreadedEchoServer(context=context, chatty=False)
3844 with server:
3845 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003846 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003847 with open(support.TESTFN, 'rb') as file:
3848 s.sendfile(file)
3849 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003850
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003851 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003852 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003853 # TODO: sessions aren't compatible with TLSv1.3 yet
3854 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003855
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003856 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003857 stats = server_params_test(client_context, server_context,
3858 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003859 session = stats['session']
3860 self.assertTrue(session.id)
3861 self.assertGreater(session.time, 0)
3862 self.assertGreater(session.timeout, 0)
3863 self.assertTrue(session.has_ticket)
3864 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3865 self.assertGreater(session.ticket_lifetime_hint, 0)
3866 self.assertFalse(stats['session_reused'])
3867 sess_stat = server_context.session_stats()
3868 self.assertEqual(sess_stat['accept'], 1)
3869 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003870
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003871 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003872 stats = server_params_test(client_context, server_context,
3873 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003874 sess_stat = server_context.session_stats()
3875 self.assertEqual(sess_stat['accept'], 2)
3876 self.assertEqual(sess_stat['hits'], 1)
3877 self.assertTrue(stats['session_reused'])
3878 session2 = stats['session']
3879 self.assertEqual(session2.id, session.id)
3880 self.assertEqual(session2, session)
3881 self.assertIsNot(session2, session)
3882 self.assertGreaterEqual(session2.time, session.time)
3883 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003884
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003885 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003886 stats = server_params_test(client_context, server_context,
3887 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003888 self.assertFalse(stats['session_reused'])
3889 session3 = stats['session']
3890 self.assertNotEqual(session3.id, session.id)
3891 self.assertNotEqual(session3, session)
3892 sess_stat = server_context.session_stats()
3893 self.assertEqual(sess_stat['accept'], 3)
3894 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003895
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003896 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003897 stats = server_params_test(client_context, server_context,
3898 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003899 self.assertTrue(stats['session_reused'])
3900 session4 = stats['session']
3901 self.assertEqual(session4.id, session.id)
3902 self.assertEqual(session4, session)
3903 self.assertGreaterEqual(session4.time, session.time)
3904 self.assertGreaterEqual(session4.timeout, session.timeout)
3905 sess_stat = server_context.session_stats()
3906 self.assertEqual(sess_stat['accept'], 4)
3907 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003908
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003909 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003910 client_context, server_context, hostname = testing_context()
3911 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02003912
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003913 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02003914 client_context.options |= ssl.OP_NO_TLSv1_3
3915 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07003916
Christian Heimesa170fa12017-09-15 20:27:30 +02003917 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003918 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003919 with client_context.wrap_socket(socket.socket(),
3920 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003921 # session is None before handshake
3922 self.assertEqual(s.session, None)
3923 self.assertEqual(s.session_reused, None)
3924 s.connect((HOST, server.port))
3925 session = s.session
3926 self.assertTrue(session)
3927 with self.assertRaises(TypeError) as e:
3928 s.session = object
3929 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003930
Christian Heimesa170fa12017-09-15 20:27:30 +02003931 with client_context.wrap_socket(socket.socket(),
3932 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003933 s.connect((HOST, server.port))
3934 # cannot set session after handshake
3935 with self.assertRaises(ValueError) as e:
3936 s.session = session
3937 self.assertEqual(str(e.exception),
3938 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003939
Christian Heimesa170fa12017-09-15 20:27:30 +02003940 with client_context.wrap_socket(socket.socket(),
3941 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003942 # can set session before handshake and before the
3943 # connection was established
3944 s.session = session
3945 s.connect((HOST, server.port))
3946 self.assertEqual(s.session.id, session.id)
3947 self.assertEqual(s.session, session)
3948 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003949
Christian Heimesa170fa12017-09-15 20:27:30 +02003950 with client_context2.wrap_socket(socket.socket(),
3951 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003952 # cannot re-use session with a different SSLContext
3953 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003954 s.session = session
3955 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003956 self.assertEqual(str(e.exception),
3957 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003958
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003959
Thomas Woutersed03b412007-08-28 21:37:11 +00003960def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003961 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003962 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003963 plats = {
3964 'Linux': platform.linux_distribution,
3965 'Mac': platform.mac_ver,
3966 'Windows': platform.win32_ver,
3967 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003968 with warnings.catch_warnings():
3969 warnings.filterwarnings(
3970 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003971 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003972 'functions are deprecated .*',
3973 PendingDeprecationWarning,
3974 )
3975 for name, func in plats.items():
3976 plat = func()
3977 if plat and plat[0]:
3978 plat = '%s %r' % (name, plat)
3979 break
3980 else:
3981 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003982 print("test_ssl: testing with %r %r" %
3983 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3984 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003985 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003986 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3987 try:
3988 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3989 except AttributeError:
3990 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003991
Antoine Pitrou152efa22010-05-16 18:19:27 +00003992 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003993 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003994 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003995 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003996 BADCERT, BADKEY, EMPTYCERT]:
3997 if not os.path.exists(filename):
3998 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003999
Martin Panter3840b2a2016-03-27 01:53:46 +00004000 tests = [
4001 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004002 SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004003 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004004
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004005 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004006 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004007
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004008 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004009 try:
4010 support.run_unittest(*tests)
4011 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004012 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004013
4014if __name__ == "__main__":
4015 test_main()