blob: a48eb890da43c474902b2c8b95591653bb1b79f9 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
33IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Christian Heimesefff7062013-11-21 03:35:02 +010036def data_file(*name):
37 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Antoine Pitrou81564092010-10-08 23:06:24 +000039# The custom key and certificate files used in test_ssl are generated
40# using Lib/test/make_ssl_certs.py.
41# Other certificates are simply fetched from the Internet servers they
42# are meant to authenticate.
43
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000046ONLYCERT = data_file("ssl_cert.pem")
47ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000048BYTES_ONLYCERT = os.fsencode(ONLYCERT)
49BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020050CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000053CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000054BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010055CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56CAFILE_CACERT = data_file("capath", "5ed36f99.0")
57
Christian Heimesbd5c7d22018-01-20 15:16:30 +010058CERTFILE_INFO = {
59 'issuer': ((('countryName', 'XY'),),
60 (('localityName', 'Castle Anthrax'),),
61 (('organizationName', 'Python Software Foundation'),),
62 (('commonName', 'localhost'),)),
63 'notAfter': 'Jan 17 19:09:06 2028 GMT',
64 'notBefore': 'Jan 19 19:09:06 2018 GMT',
65 'serialNumber': 'F9BA076D5B6ABD9B',
66 'subject': ((('countryName', 'XY'),),
67 (('localityName', 'Castle Anthrax'),),
68 (('organizationName', 'Python Software Foundation'),),
69 (('commonName', 'localhost'),)),
70 'subjectAltName': (('DNS', 'localhost'),),
71 'version': 3
72}
Antoine Pitrou152efa22010-05-16 18:19:27 +000073
Christian Heimes22587792013-11-21 23:56:13 +010074# empty CRL
75CRLFILE = data_file("revocation.crl")
76
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010077# Two keys and certs signed by the same CA (for SNI tests)
78SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020079SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010080
81SIGNED_CERTFILE_INFO = {
82 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
83 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
84 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
85 'issuer': ((('countryName', 'XY'),),
86 (('organizationName', 'Python Software Foundation CA'),),
87 (('commonName', 'our-ca-server'),)),
88 'notAfter': 'Nov 28 19:09:06 2027 GMT',
89 'notBefore': 'Jan 19 19:09:06 2018 GMT',
90 'serialNumber': '82EDBF41C880919C',
91 'subject': ((('countryName', 'XY'),),
92 (('localityName', 'Castle Anthrax'),),
93 (('organizationName', 'Python Software Foundation'),),
94 (('commonName', 'localhost'),)),
95 'subjectAltName': (('DNS', 'localhost'),),
96 'version': 3
97}
98
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010099SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
102SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
103
Martin Panter3840b2a2016-03-27 01:53:46 +0000104# Same certificate as pycacert.pem, but without extra text in file
105SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200106# cert with all kinds of subject alt names
107ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100108IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100109
Martin Panter3d81d932016-01-14 09:36:00 +0000110REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000111
112EMPTYCERT = data_file("nullcert.pem")
113BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000114NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000115BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200116NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200117NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000118
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400119DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100120BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000121
Christian Heimes358cfd42016-09-10 22:43:48 +0200122# Not defined in all versions of OpenSSL
123OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
124OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
125OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
126OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
127
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100128
Thomas Woutersed03b412007-08-28 21:37:11 +0000129def handle_error(prefix):
130 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000131 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000132 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000133
Antoine Pitroub5218772010-05-21 09:56:06 +0000134def can_clear_options():
135 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200136 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000137
138def no_sslv2_implies_sslv3_hello():
139 # 0.9.7h or higher
140 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
141
Christian Heimes2427b502013-11-23 11:24:32 +0100142def have_verify_flags():
143 # 0.9.8 or higher
144 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
145
Antoine Pitrouc695c952014-04-28 20:57:36 +0200146def utc_offset(): #NOTE: ignore issues like #1647654
147 # local time = utc time + utc offset
148 if time.daylight and time.localtime().tm_isdst > 0:
149 return -time.altzone # seconds
150 return -time.timezone
151
Christian Heimes9424bb42013-06-17 15:32:57 +0200152def asn1time(cert_time):
153 # Some versions of OpenSSL ignore seconds, see #18207
154 # 0.9.8.i
155 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
156 fmt = "%b %d %H:%M:%S %Y GMT"
157 dt = datetime.datetime.strptime(cert_time, fmt)
158 dt = dt.replace(second=0)
159 cert_time = dt.strftime(fmt)
160 # %d adds leading zero but ASN1_TIME_print() uses leading space
161 if cert_time[4] == "0":
162 cert_time = cert_time[:4] + " " + cert_time[5:]
163
164 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000165
Antoine Pitrou23df4832010-08-04 17:14:06 +0000166# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
167def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200168 if hasattr(ssl, 'PROTOCOL_SSLv2'):
169 @functools.wraps(func)
170 def f(*args, **kwargs):
171 try:
172 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
173 except ssl.SSLError:
174 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
175 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
176 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
177 return func(*args, **kwargs)
178 return f
179 else:
180 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000181
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100182needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184
Christian Heimesd0486372016-09-10 23:23:33 +0200185def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
186 cert_reqs=ssl.CERT_NONE, ca_certs=None,
187 ciphers=None, certfile=None, keyfile=None,
188 **kwargs):
189 context = ssl.SSLContext(ssl_version)
190 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200191 if cert_reqs == ssl.CERT_NONE:
192 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200193 context.verify_mode = cert_reqs
194 if ca_certs is not None:
195 context.load_verify_locations(ca_certs)
196 if certfile is not None or keyfile is not None:
197 context.load_cert_chain(certfile, keyfile)
198 if ciphers is not None:
199 context.set_ciphers(ciphers)
200 return context.wrap_socket(sock, **kwargs)
201
Christian Heimesa170fa12017-09-15 20:27:30 +0200202
203def testing_context(server_cert=SIGNED_CERTFILE):
204 """Create context
205
206 client_context, server_context, hostname = testing_context()
207 """
208 if server_cert == SIGNED_CERTFILE:
209 hostname = SIGNED_CERTFILE_HOSTNAME
210 elif server_cert == SIGNED_CERTFILE2:
211 hostname = SIGNED_CERTFILE2_HOSTNAME
212 else:
213 raise ValueError(server_cert)
214
215 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
216 client_context.load_verify_locations(SIGNING_CA)
217
218 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
219 server_context.load_cert_chain(server_cert)
220
221 return client_context, server_context, hostname
222
223
Antoine Pitrou152efa22010-05-16 18:19:27 +0000224class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000225
Antoine Pitrou480a1242010-04-28 21:37:09 +0000226 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000227 ssl.CERT_NONE
228 ssl.CERT_OPTIONAL
229 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100230 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100231 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100232 if ssl.HAS_ECDH:
233 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100234 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
235 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000236 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100237 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700238 ssl.OP_NO_SSLv2
239 ssl.OP_NO_SSLv3
240 ssl.OP_NO_TLSv1
241 ssl.OP_NO_TLSv1_3
242 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
243 ssl.OP_NO_TLSv1_1
244 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200245 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000246
Antoine Pitrou172f0252014-04-18 20:33:08 +0200247 def test_str_for_enums(self):
248 # Make sure that the PROTOCOL_* constants have enum-like string
249 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200250 proto = ssl.PROTOCOL_TLS
251 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200252 ctx = ssl.SSLContext(proto)
253 self.assertIs(ctx.protocol, proto)
254
Antoine Pitrou480a1242010-04-28 21:37:09 +0000255 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000256 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000258 sys.stdout.write("\n RAND_status is %d (%s)\n"
259 % (v, (v and "sufficient randomness") or
260 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200261
262 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
263 self.assertEqual(len(data), 16)
264 self.assertEqual(is_cryptographic, v == 1)
265 if v:
266 data = ssl.RAND_bytes(16)
267 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200268 else:
269 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200270
Victor Stinner1e81a392013-12-19 16:47:04 +0100271 # negative num is invalid
272 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
273 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
274
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100275 if hasattr(ssl, 'RAND_egd'):
276 self.assertRaises(TypeError, ssl.RAND_egd, 1)
277 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200279 ssl.RAND_add(b"this is a random bytes object", 75.0)
280 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000281
Christian Heimesf77b4b22013-08-21 13:26:05 +0200282 @unittest.skipUnless(os.name == 'posix', 'requires posix')
283 def test_random_fork(self):
284 status = ssl.RAND_status()
285 if not status:
286 self.fail("OpenSSL's PRNG has insufficient randomness")
287
288 rfd, wfd = os.pipe()
289 pid = os.fork()
290 if pid == 0:
291 try:
292 os.close(rfd)
293 child_random = ssl.RAND_pseudo_bytes(16)[0]
294 self.assertEqual(len(child_random), 16)
295 os.write(wfd, child_random)
296 os.close(wfd)
297 except BaseException:
298 os._exit(1)
299 else:
300 os._exit(0)
301 else:
302 os.close(wfd)
303 self.addCleanup(os.close, rfd)
304 _, status = os.waitpid(pid, 0)
305 self.assertEqual(status, 0)
306
307 child_random = os.read(rfd, 16)
308 self.assertEqual(len(child_random), 16)
309 parent_random = ssl.RAND_pseudo_bytes(16)[0]
310 self.assertEqual(len(parent_random), 16)
311
312 self.assertNotEqual(child_random, parent_random)
313
Antoine Pitrou480a1242010-04-28 21:37:09 +0000314 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000315 # note that this uses an 'unofficial' function in _ssl.c,
316 # provided solely for this test, to exercise the certificate
317 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100318 self.assertEqual(
319 ssl._ssl._test_decode_cert(CERTFILE),
320 CERTFILE_INFO
321 )
322 self.assertEqual(
323 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
324 SIGNED_CERTFILE_INFO
325 )
326
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200327 # Issue #13034: the subjectAltName in some certificates
328 # (notably projects.developer.nokia.com:443) wasn't parsed
329 p = ssl._ssl._test_decode_cert(NOKIACERT)
330 if support.verbose:
331 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
332 self.assertEqual(p['subjectAltName'],
333 (('DNS', 'projects.developer.nokia.com'),
334 ('DNS', 'projects.forum.nokia.com'))
335 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100336 # extra OCSP and AIA fields
337 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
338 self.assertEqual(p['caIssuers'],
339 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
340 self.assertEqual(p['crlDistributionPoints'],
341 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000342
Christian Heimes824f7f32013-08-17 00:54:47 +0200343 def test_parse_cert_CVE_2013_4238(self):
344 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
345 if support.verbose:
346 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
347 subject = ((('countryName', 'US'),),
348 (('stateOrProvinceName', 'Oregon'),),
349 (('localityName', 'Beaverton'),),
350 (('organizationName', 'Python Software Foundation'),),
351 (('organizationalUnitName', 'Python Core Development'),),
352 (('commonName', 'null.python.org\x00example.org'),),
353 (('emailAddress', 'python-dev@python.org'),))
354 self.assertEqual(p['subject'], subject)
355 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200356 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
357 san = (('DNS', 'altnull.python.org\x00example.com'),
358 ('email', 'null@python.org\x00user@example.org'),
359 ('URI', 'http://null.python.org\x00http://example.org'),
360 ('IP Address', '192.0.2.1'),
361 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
362 else:
363 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
364 san = (('DNS', 'altnull.python.org\x00example.com'),
365 ('email', 'null@python.org\x00user@example.org'),
366 ('URI', 'http://null.python.org\x00http://example.org'),
367 ('IP Address', '192.0.2.1'),
368 ('IP Address', '<invalid>'))
369
370 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200371
Christian Heimes1c03abd2016-09-06 23:25:35 +0200372 def test_parse_all_sans(self):
373 p = ssl._ssl._test_decode_cert(ALLSANFILE)
374 self.assertEqual(p['subjectAltName'],
375 (
376 ('DNS', 'allsans'),
377 ('othername', '<unsupported>'),
378 ('othername', '<unsupported>'),
379 ('email', 'user@example.org'),
380 ('DNS', 'www.example.org'),
381 ('DirName',
382 ((('countryName', 'XY'),),
383 (('localityName', 'Castle Anthrax'),),
384 (('organizationName', 'Python Software Foundation'),),
385 (('commonName', 'dirname example'),))),
386 ('URI', 'https://www.python.org/'),
387 ('IP Address', '127.0.0.1'),
388 ('IP Address', '0:0:0:0:0:0:0:1\n'),
389 ('Registered ID', '1.2.3.4.5')
390 )
391 )
392
Antoine Pitrou480a1242010-04-28 21:37:09 +0000393 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000394 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000395 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000396 d1 = ssl.PEM_cert_to_DER_cert(pem)
397 p2 = ssl.DER_cert_to_PEM_cert(d1)
398 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000399 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000400 if not p2.startswith(ssl.PEM_HEADER + '\n'):
401 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
402 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
403 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000404
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000405 def test_openssl_version(self):
406 n = ssl.OPENSSL_VERSION_NUMBER
407 t = ssl.OPENSSL_VERSION_INFO
408 s = ssl.OPENSSL_VERSION
409 self.assertIsInstance(n, int)
410 self.assertIsInstance(t, tuple)
411 self.assertIsInstance(s, str)
412 # Some sanity checks follow
413 # >= 0.9
414 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400415 # < 3.0
416 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000417 major, minor, fix, patch, status = t
418 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400419 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000420 self.assertGreaterEqual(minor, 0)
421 self.assertLess(minor, 256)
422 self.assertGreaterEqual(fix, 0)
423 self.assertLess(fix, 256)
424 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100425 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000426 self.assertGreaterEqual(status, 0)
427 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400428 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200429 if IS_LIBRESSL:
430 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100431 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400432 else:
433 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100434 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000435
Antoine Pitrou9d543662010-04-23 23:10:32 +0000436 @support.cpython_only
437 def test_refcycle(self):
438 # Issue #7943: an SSL object doesn't create reference cycles with
439 # itself.
440 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200441 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000442 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100443 with support.check_warnings(("", ResourceWarning)):
444 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100445 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000446
Antoine Pitroua468adc2010-09-14 14:43:44 +0000447 def test_wrapped_unconnected(self):
448 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200449 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000450 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200451 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100452 self.assertRaises(OSError, ss.recv, 1)
453 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
454 self.assertRaises(OSError, ss.recvfrom, 1)
455 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
456 self.assertRaises(OSError, ss.send, b'x')
457 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000458
Antoine Pitrou40f08742010-04-24 22:04:40 +0000459 def test_timeout(self):
460 # Issue #8524: when creating an SSL socket, the timeout of the
461 # original socket should be retained.
462 for timeout in (None, 0.0, 5.0):
463 s = socket.socket(socket.AF_INET)
464 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200465 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100466 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000467
Christian Heimesd0486372016-09-10 23:23:33 +0200468 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000469 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000470 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000471 "certfile must be specified",
472 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000473 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000474 "certfile must be specified for server-side operations",
475 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000476 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000477 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200478 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100479 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
480 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200481 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200482 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000483 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000484 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000485 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200486 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000487 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000488 ssl.wrap_socket(sock,
489 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000490 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200491 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000492 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000493 ssl.wrap_socket(sock,
494 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000495 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000496
Martin Panter3464ea22016-02-01 21:58:11 +0000497 def bad_cert_test(self, certfile):
498 """Check that trying to use the given client certificate fails"""
499 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
500 certfile)
501 sock = socket.socket()
502 self.addCleanup(sock.close)
503 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200504 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200505 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000506
507 def test_empty_cert(self):
508 """Wrapping with an empty cert file"""
509 self.bad_cert_test("nullcert.pem")
510
511 def test_malformed_cert(self):
512 """Wrapping with a badly formatted certificate (syntax error)"""
513 self.bad_cert_test("badcert.pem")
514
515 def test_malformed_key(self):
516 """Wrapping with a badly formatted key (syntax error)"""
517 self.bad_cert_test("badkey.pem")
518
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000519 def test_match_hostname(self):
520 def ok(cert, hostname):
521 ssl.match_hostname(cert, hostname)
522 def fail(cert, hostname):
523 self.assertRaises(ssl.CertificateError,
524 ssl.match_hostname, cert, hostname)
525
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100526 # -- Hostname matching --
527
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000528 cert = {'subject': ((('commonName', 'example.com'),),)}
529 ok(cert, 'example.com')
530 ok(cert, 'ExAmple.cOm')
531 fail(cert, 'www.example.com')
532 fail(cert, '.example.com')
533 fail(cert, 'example.org')
534 fail(cert, 'exampleXcom')
535
536 cert = {'subject': ((('commonName', '*.a.com'),),)}
537 ok(cert, 'foo.a.com')
538 fail(cert, 'bar.foo.a.com')
539 fail(cert, 'a.com')
540 fail(cert, 'Xa.com')
541 fail(cert, '.a.com')
542
Mandeep Singhede2ac92017-11-27 04:01:27 +0530543 # only match wildcards when they are the only thing
544 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530546 fail(cert, 'foo.com')
547 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000548 fail(cert, 'bar.com')
549 fail(cert, 'foo.a.com')
550 fail(cert, 'bar.foo.com')
551
Christian Heimes824f7f32013-08-17 00:54:47 +0200552 # NULL bytes are bad, CVE-2013-4073
553 cert = {'subject': ((('commonName',
554 'null.python.org\x00example.org'),),)}
555 ok(cert, 'null.python.org\x00example.org') # or raise an error?
556 fail(cert, 'example.org')
557 fail(cert, 'null.python.org')
558
Georg Brandl72c98d32013-10-27 07:16:53 +0100559 # error cases with wildcards
560 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
561 fail(cert, 'bar.foo.a.com')
562 fail(cert, 'a.com')
563 fail(cert, 'Xa.com')
564 fail(cert, '.a.com')
565
566 cert = {'subject': ((('commonName', 'a.*.com'),),)}
567 fail(cert, 'a.foo.com')
568 fail(cert, 'a..com')
569 fail(cert, 'a.com')
570
571 # wildcard doesn't match IDNA prefix 'xn--'
572 idna = 'püthon.python.org'.encode("idna").decode("ascii")
573 cert = {'subject': ((('commonName', idna),),)}
574 ok(cert, idna)
575 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
576 fail(cert, idna)
577 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
578 fail(cert, idna)
579
580 # wildcard in first fragment and IDNA A-labels in sequent fragments
581 # are supported.
582 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
583 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530584 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
585 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100586 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
587 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
588
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000589 # Slightly fake real-world example
590 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
591 'subject': ((('commonName', 'linuxfrz.org'),),),
592 'subjectAltName': (('DNS', 'linuxfr.org'),
593 ('DNS', 'linuxfr.com'),
594 ('othername', '<unsupported>'))}
595 ok(cert, 'linuxfr.org')
596 ok(cert, 'linuxfr.com')
597 # Not a "DNS" entry
598 fail(cert, '<unsupported>')
599 # When there is a subjectAltName, commonName isn't used
600 fail(cert, 'linuxfrz.org')
601
602 # A pristine real-world example
603 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
604 'subject': ((('countryName', 'US'),),
605 (('stateOrProvinceName', 'California'),),
606 (('localityName', 'Mountain View'),),
607 (('organizationName', 'Google Inc'),),
608 (('commonName', 'mail.google.com'),))}
609 ok(cert, 'mail.google.com')
610 fail(cert, 'gmail.com')
611 # Only commonName is considered
612 fail(cert, 'California')
613
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100614 # -- IPv4 matching --
615 cert = {'subject': ((('commonName', 'example.com'),),),
616 'subjectAltName': (('DNS', 'example.com'),
617 ('IP Address', '10.11.12.13'),
618 ('IP Address', '14.15.16.17'))}
619 ok(cert, '10.11.12.13')
620 ok(cert, '14.15.16.17')
621 fail(cert, '14.15.16.18')
622 fail(cert, 'example.net')
623
624 # -- IPv6 matching --
625 cert = {'subject': ((('commonName', 'example.com'),),),
626 'subjectAltName': (('DNS', 'example.com'),
627 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
628 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
629 ok(cert, '2001::cafe')
630 ok(cert, '2003::baba')
631 fail(cert, '2003::bebe')
632 fail(cert, 'example.net')
633
634 # -- Miscellaneous --
635
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000636 # Neither commonName nor subjectAltName
637 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
638 'subject': ((('countryName', 'US'),),
639 (('stateOrProvinceName', 'California'),),
640 (('localityName', 'Mountain View'),),
641 (('organizationName', 'Google Inc'),))}
642 fail(cert, 'mail.google.com')
643
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200644 # No DNS entry in subjectAltName but a commonName
645 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
646 'subject': ((('countryName', 'US'),),
647 (('stateOrProvinceName', 'California'),),
648 (('localityName', 'Mountain View'),),
649 (('commonName', 'mail.google.com'),)),
650 'subjectAltName': (('othername', 'blabla'), )}
651 ok(cert, 'mail.google.com')
652
653 # No DNS entry subjectAltName and no commonName
654 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
655 'subject': ((('countryName', 'US'),),
656 (('stateOrProvinceName', 'California'),),
657 (('localityName', 'Mountain View'),),
658 (('organizationName', 'Google Inc'),)),
659 'subjectAltName': (('othername', 'blabla'),)}
660 fail(cert, 'google.com')
661
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000662 # Empty cert / no cert
663 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
664 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
665
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200666 # Issue #17980: avoid denials of service by refusing more than one
667 # wildcard per fragment.
668 cert = {'subject': ((('commonName', 'a*b.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530669 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200670 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100671 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200672 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
673 with self.assertRaises(ssl.CertificateError) as cm:
674 ssl.match_hostname(cert, 'axxbxxc.com')
675 self.assertIn("too many wildcards", str(cm.exception))
676
Antoine Pitroud5323212010-10-22 18:19:07 +0000677 def test_server_side(self):
678 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200679 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000680 with socket.socket() as sock:
681 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
682 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000683
Antoine Pitroud6494802011-07-21 01:11:30 +0200684 def test_unknown_channel_binding(self):
685 # should raise ValueError for unknown type
686 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200687 s.bind(('127.0.0.1', 0))
688 s.listen()
689 c = socket.socket(socket.AF_INET)
690 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200691 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100692 with self.assertRaises(ValueError):
693 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200694 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200695
696 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
697 "'tls-unique' channel binding not available")
698 def test_tls_unique_channel_binding(self):
699 # unconnected should return None for known type
700 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200701 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100702 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200703 # the same for server-side
704 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200705 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100706 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200707
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600708 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200709 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600710 r = repr(ss)
711 with self.assertWarns(ResourceWarning) as cm:
712 ss = None
713 support.gc_collect()
714 self.assertIn(r, str(cm.warning.args[0]))
715
Christian Heimes6d7ad132013-06-09 18:02:55 +0200716 def test_get_default_verify_paths(self):
717 paths = ssl.get_default_verify_paths()
718 self.assertEqual(len(paths), 6)
719 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
720
721 with support.EnvironmentVarGuard() as env:
722 env["SSL_CERT_DIR"] = CAPATH
723 env["SSL_CERT_FILE"] = CERTFILE
724 paths = ssl.get_default_verify_paths()
725 self.assertEqual(paths.cafile, CERTFILE)
726 self.assertEqual(paths.capath, CAPATH)
727
Christian Heimes44109d72013-11-22 01:51:30 +0100728 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
729 def test_enum_certificates(self):
730 self.assertTrue(ssl.enum_certificates("CA"))
731 self.assertTrue(ssl.enum_certificates("ROOT"))
732
733 self.assertRaises(TypeError, ssl.enum_certificates)
734 self.assertRaises(WindowsError, ssl.enum_certificates, "")
735
Christian Heimesc2d65e12013-11-22 16:13:55 +0100736 trust_oids = set()
737 for storename in ("CA", "ROOT"):
738 store = ssl.enum_certificates(storename)
739 self.assertIsInstance(store, list)
740 for element in store:
741 self.assertIsInstance(element, tuple)
742 self.assertEqual(len(element), 3)
743 cert, enc, trust = element
744 self.assertIsInstance(cert, bytes)
745 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
746 self.assertIsInstance(trust, (set, bool))
747 if isinstance(trust, set):
748 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100749
750 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100751 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200752
Christian Heimes46bebee2013-06-09 19:03:31 +0200753 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100754 def test_enum_crls(self):
755 self.assertTrue(ssl.enum_crls("CA"))
756 self.assertRaises(TypeError, ssl.enum_crls)
757 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200758
Christian Heimes44109d72013-11-22 01:51:30 +0100759 crls = ssl.enum_crls("CA")
760 self.assertIsInstance(crls, list)
761 for element in crls:
762 self.assertIsInstance(element, tuple)
763 self.assertEqual(len(element), 2)
764 self.assertIsInstance(element[0], bytes)
765 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200766
Christian Heimes46bebee2013-06-09 19:03:31 +0200767
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100768 def test_asn1object(self):
769 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
770 '1.3.6.1.5.5.7.3.1')
771
772 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
773 self.assertEqual(val, expected)
774 self.assertEqual(val.nid, 129)
775 self.assertEqual(val.shortname, 'serverAuth')
776 self.assertEqual(val.longname, 'TLS Web Server Authentication')
777 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
778 self.assertIsInstance(val, ssl._ASN1Object)
779 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
780
781 val = ssl._ASN1Object.fromnid(129)
782 self.assertEqual(val, expected)
783 self.assertIsInstance(val, ssl._ASN1Object)
784 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100785 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
786 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100787 for i in range(1000):
788 try:
789 obj = ssl._ASN1Object.fromnid(i)
790 except ValueError:
791 pass
792 else:
793 self.assertIsInstance(obj.nid, int)
794 self.assertIsInstance(obj.shortname, str)
795 self.assertIsInstance(obj.longname, str)
796 self.assertIsInstance(obj.oid, (str, type(None)))
797
798 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
799 self.assertEqual(val, expected)
800 self.assertIsInstance(val, ssl._ASN1Object)
801 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
802 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
803 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100804 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
805 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100806
Christian Heimes72d28502013-11-23 13:56:58 +0100807 def test_purpose_enum(self):
808 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
809 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
810 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
811 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
812 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
813 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
814 '1.3.6.1.5.5.7.3.1')
815
816 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
817 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
818 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
819 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
820 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
821 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
822 '1.3.6.1.5.5.7.3.2')
823
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100824 def test_unsupported_dtls(self):
825 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
826 self.addCleanup(s.close)
827 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200828 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100829 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200830 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100831 with self.assertRaises(NotImplementedError) as cx:
832 ctx.wrap_socket(s)
833 self.assertEqual(str(cx.exception), "only stream sockets are supported")
834
Antoine Pitrouc695c952014-04-28 20:57:36 +0200835 def cert_time_ok(self, timestring, timestamp):
836 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
837
838 def cert_time_fail(self, timestring):
839 with self.assertRaises(ValueError):
840 ssl.cert_time_to_seconds(timestring)
841
842 @unittest.skipUnless(utc_offset(),
843 'local time needs to be different from UTC')
844 def test_cert_time_to_seconds_timezone(self):
845 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
846 # results if local timezone is not UTC
847 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
848 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
849
850 def test_cert_time_to_seconds(self):
851 timestring = "Jan 5 09:34:43 2018 GMT"
852 ts = 1515144883.0
853 self.cert_time_ok(timestring, ts)
854 # accept keyword parameter, assert its name
855 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
856 # accept both %e and %d (space or zero generated by strftime)
857 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
858 # case-insensitive
859 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
860 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
861 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
862 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
863 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
864 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
865 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
866 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
867
868 newyear_ts = 1230768000.0
869 # leap seconds
870 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
871 # same timestamp
872 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
873
874 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
875 # allow 60th second (even if it is not a leap second)
876 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
877 # allow 2nd leap second for compatibility with time.strptime()
878 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
879 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
880
Mike53f7a7c2017-12-14 14:04:53 +0300881 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200882 # 99991231235959Z (rfc 5280)
883 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
884
885 @support.run_with_locale('LC_ALL', '')
886 def test_cert_time_to_seconds_locale(self):
887 # `cert_time_to_seconds()` should be locale independent
888
889 def local_february_name():
890 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
891
892 if local_february_name().lower() == 'feb':
893 self.skipTest("locale-specific month name needs to be "
894 "different from C locale")
895
896 # locale-independent
897 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
898 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
899
Martin Panter3840b2a2016-03-27 01:53:46 +0000900 def test_connect_ex_error(self):
901 server = socket.socket(socket.AF_INET)
902 self.addCleanup(server.close)
903 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200904 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000905 cert_reqs=ssl.CERT_REQUIRED)
906 self.addCleanup(s.close)
907 rc = s.connect_ex((HOST, port))
908 # Issue #19919: Windows machines or VMs hosted on Windows
909 # machines sometimes return EWOULDBLOCK.
910 errors = (
911 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
912 errno.EWOULDBLOCK,
913 )
914 self.assertIn(rc, errors)
915
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100916
Antoine Pitrou152efa22010-05-16 18:19:27 +0000917class ContextTests(unittest.TestCase):
918
Antoine Pitrou23df4832010-08-04 17:14:06 +0000919 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000920 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100921 for protocol in PROTOCOLS:
922 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200923 ctx = ssl.SSLContext()
924 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000925 self.assertRaises(ValueError, ssl.SSLContext, -1)
926 self.assertRaises(ValueError, ssl.SSLContext, 42)
927
Antoine Pitrou23df4832010-08-04 17:14:06 +0000928 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000929 def test_protocol(self):
930 for proto in PROTOCOLS:
931 ctx = ssl.SSLContext(proto)
932 self.assertEqual(ctx.protocol, proto)
933
934 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200935 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000936 ctx.set_ciphers("ALL")
937 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000938 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000939 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000940
Christian Heimes892d66e2018-01-29 14:10:18 +0100941 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
942 "Test applies only to Python default ciphers")
943 def test_python_ciphers(self):
944 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
945 ciphers = ctx.get_ciphers()
946 for suite in ciphers:
947 name = suite['name']
948 self.assertNotIn("PSK", name)
949 self.assertNotIn("SRP", name)
950 self.assertNotIn("MD5", name)
951 self.assertNotIn("RC4", name)
952 self.assertNotIn("3DES", name)
953
Christian Heimes25bfcd52016-09-06 00:04:45 +0200954 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
955 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200956 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200957 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200958 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +0200959 self.assertIn('AES256-GCM-SHA384', names)
960 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +0200961
Antoine Pitrou23df4832010-08-04 17:14:06 +0000962 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000963 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200964 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800965 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +0200966 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +0200967 # SSLContext also enables these by default
968 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
969 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
Christian Heimes598894f2016-09-05 23:19:05 +0200970 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800971 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +0200972 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000973 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +0200974 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
975 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000976 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700977 # Ubuntu has OP_NO_SSLv3 forced on by default
978 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000979 else:
980 with self.assertRaises(ValueError):
981 ctx.options = 0
982
Christian Heimesa170fa12017-09-15 20:27:30 +0200983 def test_verify_mode_protocol(self):
984 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000985 # Default value
986 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
987 ctx.verify_mode = ssl.CERT_OPTIONAL
988 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
989 ctx.verify_mode = ssl.CERT_REQUIRED
990 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
991 ctx.verify_mode = ssl.CERT_NONE
992 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
993 with self.assertRaises(TypeError):
994 ctx.verify_mode = None
995 with self.assertRaises(ValueError):
996 ctx.verify_mode = 42
997
Christian Heimesa170fa12017-09-15 20:27:30 +0200998 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
999 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1000 self.assertFalse(ctx.check_hostname)
1001
1002 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1003 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1004 self.assertTrue(ctx.check_hostname)
1005
Christian Heimes61d478c2018-01-27 15:51:38 +01001006 def test_hostname_checks_common_name(self):
1007 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1008 self.assertTrue(ctx.hostname_checks_common_name)
1009 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1010 ctx.hostname_checks_common_name = True
1011 self.assertTrue(ctx.hostname_checks_common_name)
1012 ctx.hostname_checks_common_name = False
1013 self.assertFalse(ctx.hostname_checks_common_name)
1014 ctx.hostname_checks_common_name = True
1015 self.assertTrue(ctx.hostname_checks_common_name)
1016 else:
1017 with self.assertRaises(AttributeError):
1018 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001019
Christian Heimes2427b502013-11-23 11:24:32 +01001020 @unittest.skipUnless(have_verify_flags(),
1021 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001022 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001024 # default value
1025 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1026 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001027 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1028 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1029 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1030 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1031 ctx.verify_flags = ssl.VERIFY_DEFAULT
1032 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1033 # supports any value
1034 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1035 self.assertEqual(ctx.verify_flags,
1036 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1037 with self.assertRaises(TypeError):
1038 ctx.verify_flags = None
1039
Antoine Pitrou152efa22010-05-16 18:19:27 +00001040 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001041 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001042 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001043 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001044 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1045 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001046 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001047 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001048 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001049 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001050 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001051 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001052 ctx.load_cert_chain(EMPTYCERT)
1053 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001054 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001055 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1056 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1057 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001058 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001059 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001060 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001061 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001062 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001063 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1064 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001065 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001066 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001067 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001068 # Password protected key and cert
1069 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1070 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1071 ctx.load_cert_chain(CERTFILE_PROTECTED,
1072 password=bytearray(KEY_PASSWORD.encode()))
1073 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1074 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1075 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1076 bytearray(KEY_PASSWORD.encode()))
1077 with self.assertRaisesRegex(TypeError, "should be a string"):
1078 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1079 with self.assertRaises(ssl.SSLError):
1080 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1081 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1082 # openssl has a fixed limit on the password buffer.
1083 # PEM_BUFSIZE is generally set to 1kb.
1084 # Return a string larger than this.
1085 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1086 # Password callback
1087 def getpass_unicode():
1088 return KEY_PASSWORD
1089 def getpass_bytes():
1090 return KEY_PASSWORD.encode()
1091 def getpass_bytearray():
1092 return bytearray(KEY_PASSWORD.encode())
1093 def getpass_badpass():
1094 return "badpass"
1095 def getpass_huge():
1096 return b'a' * (1024 * 1024)
1097 def getpass_bad_type():
1098 return 9
1099 def getpass_exception():
1100 raise Exception('getpass error')
1101 class GetPassCallable:
1102 def __call__(self):
1103 return KEY_PASSWORD
1104 def getpass(self):
1105 return KEY_PASSWORD
1106 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1107 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1108 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1109 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1110 ctx.load_cert_chain(CERTFILE_PROTECTED,
1111 password=GetPassCallable().getpass)
1112 with self.assertRaises(ssl.SSLError):
1113 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1114 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1115 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1116 with self.assertRaisesRegex(TypeError, "must return a string"):
1117 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1118 with self.assertRaisesRegex(Exception, "getpass error"):
1119 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1120 # Make sure the password function isn't called if it isn't needed
1121 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001122
1123 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001124 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001125 ctx.load_verify_locations(CERTFILE)
1126 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1127 ctx.load_verify_locations(BYTES_CERTFILE)
1128 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1129 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001130 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001131 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001132 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001133 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001134 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001135 ctx.load_verify_locations(BADCERT)
1136 ctx.load_verify_locations(CERTFILE, CAPATH)
1137 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1138
Victor Stinner80f75e62011-01-29 11:31:20 +00001139 # Issue #10989: crash if the second argument type is invalid
1140 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1141
Christian Heimesefff7062013-11-21 03:35:02 +01001142 def test_load_verify_cadata(self):
1143 # test cadata
1144 with open(CAFILE_CACERT) as f:
1145 cacert_pem = f.read()
1146 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1147 with open(CAFILE_NEURONIO) as f:
1148 neuronio_pem = f.read()
1149 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1150
1151 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001153 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1154 ctx.load_verify_locations(cadata=cacert_pem)
1155 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1156 ctx.load_verify_locations(cadata=neuronio_pem)
1157 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1158 # cert already in hash table
1159 ctx.load_verify_locations(cadata=neuronio_pem)
1160 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1161
1162 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001163 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001164 combined = "\n".join((cacert_pem, neuronio_pem))
1165 ctx.load_verify_locations(cadata=combined)
1166 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1167
1168 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001170 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1171 neuronio_pem, "tail"]
1172 ctx.load_verify_locations(cadata="\n".join(combined))
1173 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1174
1175 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001177 ctx.load_verify_locations(cadata=cacert_der)
1178 ctx.load_verify_locations(cadata=neuronio_der)
1179 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1180 # cert already in hash table
1181 ctx.load_verify_locations(cadata=cacert_der)
1182 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1183
1184 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001185 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001186 combined = b"".join((cacert_der, neuronio_der))
1187 ctx.load_verify_locations(cadata=combined)
1188 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1189
1190 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001192 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1193
1194 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1195 ctx.load_verify_locations(cadata="broken")
1196 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1197 ctx.load_verify_locations(cadata=b"broken")
1198
1199
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001200 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001201 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001202 ctx.load_dh_params(DHFILE)
1203 if os.name != 'nt':
1204 ctx.load_dh_params(BYTES_DHFILE)
1205 self.assertRaises(TypeError, ctx.load_dh_params)
1206 self.assertRaises(TypeError, ctx.load_dh_params, None)
1207 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001208 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001209 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001210 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001211 ctx.load_dh_params(CERTFILE)
1212
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001213 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001214 def test_session_stats(self):
1215 for proto in PROTOCOLS:
1216 ctx = ssl.SSLContext(proto)
1217 self.assertEqual(ctx.session_stats(), {
1218 'number': 0,
1219 'connect': 0,
1220 'connect_good': 0,
1221 'connect_renegotiate': 0,
1222 'accept': 0,
1223 'accept_good': 0,
1224 'accept_renegotiate': 0,
1225 'hits': 0,
1226 'misses': 0,
1227 'timeouts': 0,
1228 'cache_full': 0,
1229 })
1230
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001231 def test_set_default_verify_paths(self):
1232 # There's not much we can do to test that it acts as expected,
1233 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001234 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001235 ctx.set_default_verify_paths()
1236
Antoine Pitrou501da612011-12-21 09:27:41 +01001237 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001238 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001239 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001240 ctx.set_ecdh_curve("prime256v1")
1241 ctx.set_ecdh_curve(b"prime256v1")
1242 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1243 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1244 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1245 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1246
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001247 @needs_sni
1248 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001249 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001250
1251 # set_servername_callback expects a callable, or None
1252 self.assertRaises(TypeError, ctx.set_servername_callback)
1253 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1254 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1255 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1256
1257 def dummycallback(sock, servername, ctx):
1258 pass
1259 ctx.set_servername_callback(None)
1260 ctx.set_servername_callback(dummycallback)
1261
1262 @needs_sni
1263 def test_sni_callback_refcycle(self):
1264 # Reference cycles through the servername callback are detected
1265 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001266 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001267 def dummycallback(sock, servername, ctx, cycle=ctx):
1268 pass
1269 ctx.set_servername_callback(dummycallback)
1270 wr = weakref.ref(ctx)
1271 del ctx, dummycallback
1272 gc.collect()
1273 self.assertIs(wr(), None)
1274
Christian Heimes9a5395a2013-06-17 15:44:12 +02001275 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001276 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001277 self.assertEqual(ctx.cert_store_stats(),
1278 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1279 ctx.load_cert_chain(CERTFILE)
1280 self.assertEqual(ctx.cert_store_stats(),
1281 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1282 ctx.load_verify_locations(CERTFILE)
1283 self.assertEqual(ctx.cert_store_stats(),
1284 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001285 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001286 self.assertEqual(ctx.cert_store_stats(),
1287 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1288
1289 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001290 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001291 self.assertEqual(ctx.get_ca_certs(), [])
1292 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1293 ctx.load_verify_locations(CERTFILE)
1294 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001295 # but CAFILE_CACERT is a CA cert
1296 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001297 self.assertEqual(ctx.get_ca_certs(),
1298 [{'issuer': ((('organizationName', 'Root CA'),),
1299 (('organizationalUnitName', 'http://www.cacert.org'),),
1300 (('commonName', 'CA Cert Signing Authority'),),
1301 (('emailAddress', 'support@cacert.org'),)),
1302 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1303 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1304 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001305 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001306 'subject': ((('organizationName', 'Root CA'),),
1307 (('organizationalUnitName', 'http://www.cacert.org'),),
1308 (('commonName', 'CA Cert Signing Authority'),),
1309 (('emailAddress', 'support@cacert.org'),)),
1310 'version': 3}])
1311
Martin Panterb55f8b72016-01-14 12:53:56 +00001312 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001313 pem = f.read()
1314 der = ssl.PEM_cert_to_DER_cert(pem)
1315 self.assertEqual(ctx.get_ca_certs(True), [der])
1316
Christian Heimes72d28502013-11-23 13:56:58 +01001317 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001318 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001319 ctx.load_default_certs()
1320
Christian Heimesa170fa12017-09-15 20:27:30 +02001321 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001322 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1323 ctx.load_default_certs()
1324
Christian Heimesa170fa12017-09-15 20:27:30 +02001325 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001326 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1327
Christian Heimesa170fa12017-09-15 20:27:30 +02001328 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001329 self.assertRaises(TypeError, ctx.load_default_certs, None)
1330 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1331
Benjamin Peterson91244e02014-10-03 18:17:15 -04001332 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001333 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001334 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001335 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001336 with support.EnvironmentVarGuard() as env:
1337 env["SSL_CERT_DIR"] = CAPATH
1338 env["SSL_CERT_FILE"] = CERTFILE
1339 ctx.load_default_certs()
1340 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1341
Benjamin Peterson91244e02014-10-03 18:17:15 -04001342 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001343 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001344 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001345 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001346 ctx.load_default_certs()
1347 stats = ctx.cert_store_stats()
1348
Christian Heimesa170fa12017-09-15 20:27:30 +02001349 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001350 with support.EnvironmentVarGuard() as env:
1351 env["SSL_CERT_DIR"] = CAPATH
1352 env["SSL_CERT_FILE"] = CERTFILE
1353 ctx.load_default_certs()
1354 stats["x509"] += 1
1355 self.assertEqual(ctx.cert_store_stats(), stats)
1356
Christian Heimes358cfd42016-09-10 22:43:48 +02001357 def _assert_context_options(self, ctx):
1358 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1359 if OP_NO_COMPRESSION != 0:
1360 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1361 OP_NO_COMPRESSION)
1362 if OP_SINGLE_DH_USE != 0:
1363 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1364 OP_SINGLE_DH_USE)
1365 if OP_SINGLE_ECDH_USE != 0:
1366 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1367 OP_SINGLE_ECDH_USE)
1368 if OP_CIPHER_SERVER_PREFERENCE != 0:
1369 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1370 OP_CIPHER_SERVER_PREFERENCE)
1371
Christian Heimes4c05b472013-11-23 15:58:30 +01001372 def test_create_default_context(self):
1373 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001374
Christian Heimesa170fa12017-09-15 20:27:30 +02001375 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001376 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001377 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001378 self._assert_context_options(ctx)
1379
Christian Heimes4c05b472013-11-23 15:58:30 +01001380 with open(SIGNING_CA) as f:
1381 cadata = f.read()
1382 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1383 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001384 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001385 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001386 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001387
1388 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001389 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001390 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001391 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001392
Christian Heimes67986f92013-11-23 22:43:47 +01001393 def test__create_stdlib_context(self):
1394 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001395 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001396 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001397 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001398 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001399
1400 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1401 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1402 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001403 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001404
1405 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001406 cert_reqs=ssl.CERT_REQUIRED,
1407 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001408 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1409 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001410 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001411 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001412
1413 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001414 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001415 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001416 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001417
Christian Heimes1aa9a752013-12-02 02:41:19 +01001418 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001419 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001420 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001421 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001422
Christian Heimese82c0342017-09-15 20:29:57 +02001423 # Auto set CERT_REQUIRED
1424 ctx.check_hostname = True
1425 self.assertTrue(ctx.check_hostname)
1426 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1427 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001428 ctx.verify_mode = ssl.CERT_REQUIRED
1429 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001430 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001431
Christian Heimese82c0342017-09-15 20:29:57 +02001432 # Changing verify_mode does not affect check_hostname
1433 ctx.check_hostname = False
1434 ctx.verify_mode = ssl.CERT_NONE
1435 ctx.check_hostname = False
1436 self.assertFalse(ctx.check_hostname)
1437 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1438 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001439 ctx.check_hostname = True
1440 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001441 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1442
1443 ctx.check_hostname = False
1444 ctx.verify_mode = ssl.CERT_OPTIONAL
1445 ctx.check_hostname = False
1446 self.assertFalse(ctx.check_hostname)
1447 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1448 # keep CERT_OPTIONAL
1449 ctx.check_hostname = True
1450 self.assertTrue(ctx.check_hostname)
1451 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001452
1453 # Cannot set CERT_NONE with check_hostname enabled
1454 with self.assertRaises(ValueError):
1455 ctx.verify_mode = ssl.CERT_NONE
1456 ctx.check_hostname = False
1457 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001458 ctx.verify_mode = ssl.CERT_NONE
1459 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001460
Christian Heimes5fe668c2016-09-12 00:01:11 +02001461 def test_context_client_server(self):
1462 # PROTOCOL_TLS_CLIENT has sane defaults
1463 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1464 self.assertTrue(ctx.check_hostname)
1465 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1466
1467 # PROTOCOL_TLS_SERVER has different but also sane defaults
1468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1469 self.assertFalse(ctx.check_hostname)
1470 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1471
Christian Heimes4df60f12017-09-15 20:26:05 +02001472 def test_context_custom_class(self):
1473 class MySSLSocket(ssl.SSLSocket):
1474 pass
1475
1476 class MySSLObject(ssl.SSLObject):
1477 pass
1478
1479 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1480 ctx.sslsocket_class = MySSLSocket
1481 ctx.sslobject_class = MySSLObject
1482
1483 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1484 self.assertIsInstance(sock, MySSLSocket)
1485 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1486 self.assertIsInstance(obj, MySSLObject)
1487
Antoine Pitrou152efa22010-05-16 18:19:27 +00001488
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001489class SSLErrorTests(unittest.TestCase):
1490
1491 def test_str(self):
1492 # The str() of a SSLError doesn't include the errno
1493 e = ssl.SSLError(1, "foo")
1494 self.assertEqual(str(e), "foo")
1495 self.assertEqual(e.errno, 1)
1496 # Same for a subclass
1497 e = ssl.SSLZeroReturnError(1, "foo")
1498 self.assertEqual(str(e), "foo")
1499 self.assertEqual(e.errno, 1)
1500
1501 def test_lib_reason(self):
1502 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001503 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001504 with self.assertRaises(ssl.SSLError) as cm:
1505 ctx.load_dh_params(CERTFILE)
1506 self.assertEqual(cm.exception.library, 'PEM')
1507 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1508 s = str(cm.exception)
1509 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1510
1511 def test_subclass(self):
1512 # Check that the appropriate SSLError subclass is raised
1513 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001514 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1515 ctx.check_hostname = False
1516 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001517 with socket.socket() as s:
1518 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001519 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001520 c = socket.socket()
1521 c.connect(s.getsockname())
1522 c.setblocking(False)
1523 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001524 with self.assertRaises(ssl.SSLWantReadError) as cm:
1525 c.do_handshake()
1526 s = str(cm.exception)
1527 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1528 # For compatibility
1529 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1530
1531
Christian Heimes61d478c2018-01-27 15:51:38 +01001532 def test_bad_server_hostname(self):
1533 ctx = ssl.create_default_context()
1534 with self.assertRaises(ValueError):
1535 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1536 server_hostname="")
1537 with self.assertRaises(ValueError):
1538 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1539 server_hostname=".example.org")
1540
1541
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001542class MemoryBIOTests(unittest.TestCase):
1543
1544 def test_read_write(self):
1545 bio = ssl.MemoryBIO()
1546 bio.write(b'foo')
1547 self.assertEqual(bio.read(), b'foo')
1548 self.assertEqual(bio.read(), b'')
1549 bio.write(b'foo')
1550 bio.write(b'bar')
1551 self.assertEqual(bio.read(), b'foobar')
1552 self.assertEqual(bio.read(), b'')
1553 bio.write(b'baz')
1554 self.assertEqual(bio.read(2), b'ba')
1555 self.assertEqual(bio.read(1), b'z')
1556 self.assertEqual(bio.read(1), b'')
1557
1558 def test_eof(self):
1559 bio = ssl.MemoryBIO()
1560 self.assertFalse(bio.eof)
1561 self.assertEqual(bio.read(), b'')
1562 self.assertFalse(bio.eof)
1563 bio.write(b'foo')
1564 self.assertFalse(bio.eof)
1565 bio.write_eof()
1566 self.assertFalse(bio.eof)
1567 self.assertEqual(bio.read(2), b'fo')
1568 self.assertFalse(bio.eof)
1569 self.assertEqual(bio.read(1), b'o')
1570 self.assertTrue(bio.eof)
1571 self.assertEqual(bio.read(), b'')
1572 self.assertTrue(bio.eof)
1573
1574 def test_pending(self):
1575 bio = ssl.MemoryBIO()
1576 self.assertEqual(bio.pending, 0)
1577 bio.write(b'foo')
1578 self.assertEqual(bio.pending, 3)
1579 for i in range(3):
1580 bio.read(1)
1581 self.assertEqual(bio.pending, 3-i-1)
1582 for i in range(3):
1583 bio.write(b'x')
1584 self.assertEqual(bio.pending, i+1)
1585 bio.read()
1586 self.assertEqual(bio.pending, 0)
1587
1588 def test_buffer_types(self):
1589 bio = ssl.MemoryBIO()
1590 bio.write(b'foo')
1591 self.assertEqual(bio.read(), b'foo')
1592 bio.write(bytearray(b'bar'))
1593 self.assertEqual(bio.read(), b'bar')
1594 bio.write(memoryview(b'baz'))
1595 self.assertEqual(bio.read(), b'baz')
1596
1597 def test_error_types(self):
1598 bio = ssl.MemoryBIO()
1599 self.assertRaises(TypeError, bio.write, 'foo')
1600 self.assertRaises(TypeError, bio.write, None)
1601 self.assertRaises(TypeError, bio.write, True)
1602 self.assertRaises(TypeError, bio.write, 1)
1603
1604
Martin Panter3840b2a2016-03-27 01:53:46 +00001605class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001606 """Tests that connect to a simple server running in the background"""
1607
1608 def setUp(self):
1609 server = ThreadedEchoServer(SIGNED_CERTFILE)
1610 self.server_addr = (HOST, server.port)
1611 server.__enter__()
1612 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001613
Antoine Pitrou480a1242010-04-28 21:37:09 +00001614 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001615 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001616 cert_reqs=ssl.CERT_NONE) as s:
1617 s.connect(self.server_addr)
1618 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001619 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001620
Martin Panter3840b2a2016-03-27 01:53:46 +00001621 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001622 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001623 cert_reqs=ssl.CERT_REQUIRED,
1624 ca_certs=SIGNING_CA) as s:
1625 s.connect(self.server_addr)
1626 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001627 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001628
Martin Panter3840b2a2016-03-27 01:53:46 +00001629 def test_connect_fail(self):
1630 # This should fail because we have no verification certs. Connection
1631 # failure crashes ThreadedEchoServer, so run this in an independent
1632 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001633 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001634 cert_reqs=ssl.CERT_REQUIRED)
1635 self.addCleanup(s.close)
1636 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1637 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001638
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001639 def test_connect_ex(self):
1640 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001641 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001642 cert_reqs=ssl.CERT_REQUIRED,
1643 ca_certs=SIGNING_CA)
1644 self.addCleanup(s.close)
1645 self.assertEqual(0, s.connect_ex(self.server_addr))
1646 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001647
1648 def test_non_blocking_connect_ex(self):
1649 # Issue #11326: non-blocking connect_ex() should allow handshake
1650 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001651 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001652 cert_reqs=ssl.CERT_REQUIRED,
1653 ca_certs=SIGNING_CA,
1654 do_handshake_on_connect=False)
1655 self.addCleanup(s.close)
1656 s.setblocking(False)
1657 rc = s.connect_ex(self.server_addr)
1658 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1659 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1660 # Wait for connect to finish
1661 select.select([], [s], [], 5.0)
1662 # Non-blocking handshake
1663 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001664 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001665 s.do_handshake()
1666 break
1667 except ssl.SSLWantReadError:
1668 select.select([s], [], [], 5.0)
1669 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001670 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001671 # SSL established
1672 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001673
Antoine Pitrou152efa22010-05-16 18:19:27 +00001674 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001675 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001676 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001677 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1678 s.connect(self.server_addr)
1679 self.assertEqual({}, s.getpeercert())
1680 # Same with a server hostname
1681 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1682 server_hostname="dummy") as s:
1683 s.connect(self.server_addr)
1684 ctx.verify_mode = ssl.CERT_REQUIRED
1685 # This should succeed because we specify the root cert
1686 ctx.load_verify_locations(SIGNING_CA)
1687 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1688 s.connect(self.server_addr)
1689 cert = s.getpeercert()
1690 self.assertTrue(cert)
1691
1692 def test_connect_with_context_fail(self):
1693 # This should fail because we have no verification certs. Connection
1694 # failure crashes ThreadedEchoServer, so run this in an independent
1695 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001696 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001697 ctx.verify_mode = ssl.CERT_REQUIRED
1698 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1699 self.addCleanup(s.close)
1700 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1701 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001702
1703 def test_connect_capath(self):
1704 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001705 # NOTE: the subject hashing algorithm has been changed between
1706 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1707 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001708 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001709 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001710 ctx.verify_mode = ssl.CERT_REQUIRED
1711 ctx.load_verify_locations(capath=CAPATH)
1712 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1713 s.connect(self.server_addr)
1714 cert = s.getpeercert()
1715 self.assertTrue(cert)
1716 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001717 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001718 ctx.verify_mode = ssl.CERT_REQUIRED
1719 ctx.load_verify_locations(capath=BYTES_CAPATH)
1720 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1721 s.connect(self.server_addr)
1722 cert = s.getpeercert()
1723 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001724
Christian Heimesefff7062013-11-21 03:35:02 +01001725 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001726 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001727 pem = f.read()
1728 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001729 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001730 ctx.verify_mode = ssl.CERT_REQUIRED
1731 ctx.load_verify_locations(cadata=pem)
1732 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1733 s.connect(self.server_addr)
1734 cert = s.getpeercert()
1735 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001736
Martin Panter3840b2a2016-03-27 01:53:46 +00001737 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001738 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001739 ctx.verify_mode = ssl.CERT_REQUIRED
1740 ctx.load_verify_locations(cadata=der)
1741 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1742 s.connect(self.server_addr)
1743 cert = s.getpeercert()
1744 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001745
Antoine Pitroue3220242010-04-24 11:13:53 +00001746 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1747 def test_makefile_close(self):
1748 # Issue #5238: creating a file-like object with makefile() shouldn't
1749 # delay closing the underlying "real socket" (here tested with its
1750 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001751 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001752 ss.connect(self.server_addr)
1753 fd = ss.fileno()
1754 f = ss.makefile()
1755 f.close()
1756 # The fd is still open
1757 os.read(fd, 0)
1758 # Closing the SSL socket should close the fd too
1759 ss.close()
1760 gc.collect()
1761 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001762 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001763 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001764
Antoine Pitrou480a1242010-04-28 21:37:09 +00001765 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001766 s = socket.socket(socket.AF_INET)
1767 s.connect(self.server_addr)
1768 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001769 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001770 cert_reqs=ssl.CERT_NONE,
1771 do_handshake_on_connect=False)
1772 self.addCleanup(s.close)
1773 count = 0
1774 while True:
1775 try:
1776 count += 1
1777 s.do_handshake()
1778 break
1779 except ssl.SSLWantReadError:
1780 select.select([s], [], [])
1781 except ssl.SSLWantWriteError:
1782 select.select([], [s], [])
1783 if support.verbose:
1784 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001785
Antoine Pitrou480a1242010-04-28 21:37:09 +00001786 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001787 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001788
Martin Panter3840b2a2016-03-27 01:53:46 +00001789 def test_get_server_certificate_fail(self):
1790 # Connection failure crashes ThreadedEchoServer, so run this in an
1791 # independent test method
1792 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001793
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001794 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001795 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001796 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1797 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001798 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001799 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1800 s.connect(self.server_addr)
1801 # Error checking can happen at instantiation or when connecting
1802 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1803 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001804 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001805 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1806 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001807
Christian Heimes9a5395a2013-06-17 15:44:12 +02001808 def test_get_ca_certs_capath(self):
1809 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001810 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001811 ctx.load_verify_locations(capath=CAPATH)
1812 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001813 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1814 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001815 s.connect(self.server_addr)
1816 cert = s.getpeercert()
1817 self.assertTrue(cert)
1818 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001819
Christian Heimes575596e2013-12-15 21:49:17 +01001820 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001821 def test_context_setget(self):
1822 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001823 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1824 ctx1.load_verify_locations(capath=CAPATH)
1825 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1826 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001827 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001828 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 ss.connect(self.server_addr)
1830 self.assertIs(ss.context, ctx1)
1831 self.assertIs(ss._sslobj.context, ctx1)
1832 ss.context = ctx2
1833 self.assertIs(ss.context, ctx2)
1834 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001835
1836 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1837 # A simple IO loop. Call func(*args) depending on the error we get
1838 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1839 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001840 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001841 count = 0
1842 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001843 if time.monotonic() > deadline:
1844 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001845 errno = None
1846 count += 1
1847 try:
1848 ret = func(*args)
1849 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001850 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001851 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001852 raise
1853 errno = e.errno
1854 # Get any data from the outgoing BIO irrespective of any error, and
1855 # send it to the socket.
1856 buf = outgoing.read()
1857 sock.sendall(buf)
1858 # If there's no error, we're done. For WANT_READ, we need to get
1859 # data from the socket and put it in the incoming BIO.
1860 if errno is None:
1861 break
1862 elif errno == ssl.SSL_ERROR_WANT_READ:
1863 buf = sock.recv(32768)
1864 if buf:
1865 incoming.write(buf)
1866 else:
1867 incoming.write_eof()
1868 if support.verbose:
1869 sys.stdout.write("Needed %d calls to complete %s().\n"
1870 % (count, func.__name__))
1871 return ret
1872
Martin Panter3840b2a2016-03-27 01:53:46 +00001873 def test_bio_handshake(self):
1874 sock = socket.socket(socket.AF_INET)
1875 self.addCleanup(sock.close)
1876 sock.connect(self.server_addr)
1877 incoming = ssl.MemoryBIO()
1878 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001879 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1880 self.assertTrue(ctx.check_hostname)
1881 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001882 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001883 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1884 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001885 self.assertIs(sslobj._sslobj.owner, sslobj)
1886 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001887 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001888 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001889 self.assertRaises(ValueError, sslobj.getpeercert)
1890 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1891 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1892 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1893 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001894 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001895 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001896 self.assertTrue(sslobj.getpeercert())
1897 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1898 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1899 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001900 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001901 except ssl.SSLSyscallError:
1902 # If the server shuts down the TCP connection without sending a
1903 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1904 pass
1905 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1906
1907 def test_bio_read_write_data(self):
1908 sock = socket.socket(socket.AF_INET)
1909 self.addCleanup(sock.close)
1910 sock.connect(self.server_addr)
1911 incoming = ssl.MemoryBIO()
1912 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001913 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001914 ctx.verify_mode = ssl.CERT_NONE
1915 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1916 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1917 req = b'FOO\n'
1918 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1919 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1920 self.assertEqual(buf, b'foo\n')
1921 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001922
1923
Martin Panter3840b2a2016-03-27 01:53:46 +00001924class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001925
Martin Panter3840b2a2016-03-27 01:53:46 +00001926 def test_timeout_connect_ex(self):
1927 # Issue #12065: on a timeout, connect_ex() should return the original
1928 # errno (mimicking the behaviour of non-SSL sockets).
1929 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02001930 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001931 cert_reqs=ssl.CERT_REQUIRED,
1932 do_handshake_on_connect=False)
1933 self.addCleanup(s.close)
1934 s.settimeout(0.0000001)
1935 rc = s.connect_ex((REMOTE_HOST, 443))
1936 if rc == 0:
1937 self.skipTest("REMOTE_HOST responded too quickly")
1938 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1939
1940 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1941 def test_get_server_certificate_ipv6(self):
1942 with support.transient_internet('ipv6.google.com'):
1943 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1944 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1945
Martin Panter3840b2a2016-03-27 01:53:46 +00001946
1947def _test_get_server_certificate(test, host, port, cert=None):
1948 pem = ssl.get_server_certificate((host, port))
1949 if not pem:
1950 test.fail("No server certificate on %s:%s!" % (host, port))
1951
1952 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1953 if not pem:
1954 test.fail("No server certificate on %s:%s!" % (host, port))
1955 if support.verbose:
1956 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1957
1958def _test_get_server_certificate_fail(test, host, port):
1959 try:
1960 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1961 except ssl.SSLError as x:
1962 #should fail
1963 if support.verbose:
1964 sys.stdout.write("%s\n" % x)
1965 else:
1966 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1967
1968
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001969from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001970
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001971class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001972
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001973 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001974
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001975 """A mildly complicated class, because we want it to work both
1976 with and without the SSL wrapper around the socket connection, so
1977 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001978
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001979 def __init__(self, server, connsock, addr):
1980 self.server = server
1981 self.running = False
1982 self.sock = connsock
1983 self.addr = addr
1984 self.sock.setblocking(1)
1985 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001986 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001987 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001988
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001989 def wrap_conn(self):
1990 try:
1991 self.sslconn = self.server.context.wrap_socket(
1992 self.sock, server_side=True)
1993 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1994 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
1995 except (ssl.SSLError, ConnectionResetError, OSError) as e:
1996 # We treat ConnectionResetError as though it were an
1997 # SSLError - OpenSSL on Ubuntu abruptly closes the
1998 # connection when asked to use an unsupported protocol.
1999 #
2000 # OSError may occur with wrong protocols, e.g. both
2001 # sides use PROTOCOL_TLS_SERVER.
2002 #
2003 # XXX Various errors can have happened here, for example
2004 # a mismatching protocol version, an invalid certificate,
2005 # or a low-level bug. This should be made more discriminating.
2006 #
2007 # bpo-31323: Store the exception as string to prevent
2008 # a reference leak: server -> conn_errors -> exception
2009 # -> traceback -> self (ConnectionHandler) -> server
2010 self.server.conn_errors.append(str(e))
2011 if self.server.chatty:
2012 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2013 self.running = False
2014 self.server.stop()
2015 self.close()
2016 return False
2017 else:
2018 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2019 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2020 cert = self.sslconn.getpeercert()
2021 if support.verbose and self.server.chatty:
2022 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2023 cert_binary = self.sslconn.getpeercert(True)
2024 if support.verbose and self.server.chatty:
2025 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2026 cipher = self.sslconn.cipher()
2027 if support.verbose and self.server.chatty:
2028 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2029 sys.stdout.write(" server: selected protocol is now "
2030 + str(self.sslconn.selected_npn_protocol()) + "\n")
2031 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002033 def read(self):
2034 if self.sslconn:
2035 return self.sslconn.read()
2036 else:
2037 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002038
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002039 def write(self, bytes):
2040 if self.sslconn:
2041 return self.sslconn.write(bytes)
2042 else:
2043 return self.sock.send(bytes)
2044
2045 def close(self):
2046 if self.sslconn:
2047 self.sslconn.close()
2048 else:
2049 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002050
Antoine Pitrou480a1242010-04-28 21:37:09 +00002051 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002052 self.running = True
2053 if not self.server.starttls_server:
2054 if not self.wrap_conn():
2055 return
2056 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002057 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002058 msg = self.read()
2059 stripped = msg.strip()
2060 if not stripped:
2061 # eof, so quit this handler
2062 self.running = False
2063 try:
2064 self.sock = self.sslconn.unwrap()
2065 except OSError:
2066 # Many tests shut the TCP connection down
2067 # without an SSL shutdown. This causes
2068 # unwrap() to raise OSError with errno=0!
2069 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002070 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002071 self.sslconn = None
2072 self.close()
2073 elif stripped == b'over':
2074 if support.verbose and self.server.connectionchatty:
2075 sys.stdout.write(" server: client closed connection\n")
2076 self.close()
2077 return
2078 elif (self.server.starttls_server and
2079 stripped == b'STARTTLS'):
2080 if support.verbose and self.server.connectionchatty:
2081 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2082 self.write(b"OK\n")
2083 if not self.wrap_conn():
2084 return
2085 elif (self.server.starttls_server and self.sslconn
2086 and stripped == b'ENDTLS'):
2087 if support.verbose and self.server.connectionchatty:
2088 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2089 self.write(b"OK\n")
2090 self.sock = self.sslconn.unwrap()
2091 self.sslconn = None
2092 if support.verbose and self.server.connectionchatty:
2093 sys.stdout.write(" server: connection is now unencrypted...\n")
2094 elif stripped == b'CB tls-unique':
2095 if support.verbose and self.server.connectionchatty:
2096 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2097 data = self.sslconn.get_channel_binding("tls-unique")
2098 self.write(repr(data).encode("us-ascii") + b"\n")
2099 else:
2100 if (support.verbose and
2101 self.server.connectionchatty):
2102 ctype = (self.sslconn and "encrypted") or "unencrypted"
2103 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2104 % (msg, ctype, msg.lower(), ctype))
2105 self.write(msg.lower())
2106 except OSError:
2107 if self.server.chatty:
2108 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002109 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002110 self.running = False
2111 # normally, we'd just stop here, but for the test
2112 # harness, we want to stop the server
2113 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002114
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002115 def __init__(self, certificate=None, ssl_version=None,
2116 certreqs=None, cacerts=None,
2117 chatty=True, connectionchatty=False, starttls_server=False,
2118 npn_protocols=None, alpn_protocols=None,
2119 ciphers=None, context=None):
2120 if context:
2121 self.context = context
2122 else:
2123 self.context = ssl.SSLContext(ssl_version
2124 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002125 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002126 self.context.verify_mode = (certreqs if certreqs is not None
2127 else ssl.CERT_NONE)
2128 if cacerts:
2129 self.context.load_verify_locations(cacerts)
2130 if certificate:
2131 self.context.load_cert_chain(certificate)
2132 if npn_protocols:
2133 self.context.set_npn_protocols(npn_protocols)
2134 if alpn_protocols:
2135 self.context.set_alpn_protocols(alpn_protocols)
2136 if ciphers:
2137 self.context.set_ciphers(ciphers)
2138 self.chatty = chatty
2139 self.connectionchatty = connectionchatty
2140 self.starttls_server = starttls_server
2141 self.sock = socket.socket()
2142 self.port = support.bind_port(self.sock)
2143 self.flag = None
2144 self.active = False
2145 self.selected_npn_protocols = []
2146 self.selected_alpn_protocols = []
2147 self.shared_ciphers = []
2148 self.conn_errors = []
2149 threading.Thread.__init__(self)
2150 self.daemon = True
2151
2152 def __enter__(self):
2153 self.start(threading.Event())
2154 self.flag.wait()
2155 return self
2156
2157 def __exit__(self, *args):
2158 self.stop()
2159 self.join()
2160
2161 def start(self, flag=None):
2162 self.flag = flag
2163 threading.Thread.start(self)
2164
2165 def run(self):
2166 self.sock.settimeout(0.05)
2167 self.sock.listen()
2168 self.active = True
2169 if self.flag:
2170 # signal an event
2171 self.flag.set()
2172 while self.active:
2173 try:
2174 newconn, connaddr = self.sock.accept()
2175 if support.verbose and self.chatty:
2176 sys.stdout.write(' server: new connection from '
2177 + repr(connaddr) + '\n')
2178 handler = self.ConnectionHandler(self, newconn, connaddr)
2179 handler.start()
2180 handler.join()
2181 except socket.timeout:
2182 pass
2183 except KeyboardInterrupt:
2184 self.stop()
2185 self.sock.close()
2186
2187 def stop(self):
2188 self.active = False
2189
2190class AsyncoreEchoServer(threading.Thread):
2191
2192 # this one's based on asyncore.dispatcher
2193
2194 class EchoServer (asyncore.dispatcher):
2195
2196 class ConnectionHandler(asyncore.dispatcher_with_send):
2197
2198 def __init__(self, conn, certfile):
2199 self.socket = test_wrap_socket(conn, server_side=True,
2200 certfile=certfile,
2201 do_handshake_on_connect=False)
2202 asyncore.dispatcher_with_send.__init__(self, self.socket)
2203 self._ssl_accepting = True
2204 self._do_ssl_handshake()
2205
2206 def readable(self):
2207 if isinstance(self.socket, ssl.SSLSocket):
2208 while self.socket.pending() > 0:
2209 self.handle_read_event()
2210 return True
2211
2212 def _do_ssl_handshake(self):
2213 try:
2214 self.socket.do_handshake()
2215 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2216 return
2217 except ssl.SSLEOFError:
2218 return self.handle_close()
2219 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002220 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002221 except OSError as err:
2222 if err.args[0] == errno.ECONNABORTED:
2223 return self.handle_close()
2224 else:
2225 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002226
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002227 def handle_read(self):
2228 if self._ssl_accepting:
2229 self._do_ssl_handshake()
2230 else:
2231 data = self.recv(1024)
2232 if support.verbose:
2233 sys.stdout.write(" server: read %s from client\n" % repr(data))
2234 if not data:
2235 self.close()
2236 else:
2237 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002238
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002239 def handle_close(self):
2240 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002241 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002242 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002243
2244 def handle_error(self):
2245 raise
2246
Trent Nelson78520002008-04-10 20:54:35 +00002247 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002248 self.certfile = certfile
2249 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2250 self.port = support.bind_port(sock, '')
2251 asyncore.dispatcher.__init__(self, sock)
2252 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002253
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002254 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002255 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002256 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2257 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002258
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002259 def handle_error(self):
2260 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002261
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002262 def __init__(self, certfile):
2263 self.flag = None
2264 self.active = False
2265 self.server = self.EchoServer(certfile)
2266 self.port = self.server.port
2267 threading.Thread.__init__(self)
2268 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002269
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002270 def __str__(self):
2271 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002272
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002273 def __enter__(self):
2274 self.start(threading.Event())
2275 self.flag.wait()
2276 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002277
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002278 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002279 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002280 sys.stdout.write(" cleanup: stopping server.\n")
2281 self.stop()
2282 if support.verbose:
2283 sys.stdout.write(" cleanup: joining server thread.\n")
2284 self.join()
2285 if support.verbose:
2286 sys.stdout.write(" cleanup: successfully joined.\n")
2287 # make sure that ConnectionHandler is removed from socket_map
2288 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002289
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002290 def start (self, flag=None):
2291 self.flag = flag
2292 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002293
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002294 def run(self):
2295 self.active = True
2296 if self.flag:
2297 self.flag.set()
2298 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002299 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002300 asyncore.loop(1)
2301 except:
2302 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002303
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002304 def stop(self):
2305 self.active = False
2306 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002307
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002308def server_params_test(client_context, server_context, indata=b"FOO\n",
2309 chatty=True, connectionchatty=False, sni_name=None,
2310 session=None):
2311 """
2312 Launch a server, connect a client to it and try various reads
2313 and writes.
2314 """
2315 stats = {}
2316 server = ThreadedEchoServer(context=server_context,
2317 chatty=chatty,
2318 connectionchatty=False)
2319 with server:
2320 with client_context.wrap_socket(socket.socket(),
2321 server_hostname=sni_name, session=session) as s:
2322 s.connect((HOST, server.port))
2323 for arg in [indata, bytearray(indata), memoryview(indata)]:
2324 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002325 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002326 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002327 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002328 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002329 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002330 if connectionchatty:
2331 if support.verbose:
2332 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002333 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002334 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002335 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2336 % (outdata[:20], len(outdata),
2337 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002338 s.write(b"over\n")
2339 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002340 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002341 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002342 stats.update({
2343 'compression': s.compression(),
2344 'cipher': s.cipher(),
2345 'peercert': s.getpeercert(),
2346 'client_alpn_protocol': s.selected_alpn_protocol(),
2347 'client_npn_protocol': s.selected_npn_protocol(),
2348 'version': s.version(),
2349 'session_reused': s.session_reused,
2350 'session': s.session,
2351 })
2352 s.close()
2353 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2354 stats['server_npn_protocols'] = server.selected_npn_protocols
2355 stats['server_shared_ciphers'] = server.shared_ciphers
2356 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002357
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002358def try_protocol_combo(server_protocol, client_protocol, expect_success,
2359 certsreqs=None, server_options=0, client_options=0):
2360 """
2361 Try to SSL-connect using *client_protocol* to *server_protocol*.
2362 If *expect_success* is true, assert that the connection succeeds,
2363 if it's false, assert that the connection fails.
2364 Also, if *expect_success* is a string, assert that it is the protocol
2365 version actually used by the connection.
2366 """
2367 if certsreqs is None:
2368 certsreqs = ssl.CERT_NONE
2369 certtype = {
2370 ssl.CERT_NONE: "CERT_NONE",
2371 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2372 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2373 }[certsreqs]
2374 if support.verbose:
2375 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2376 sys.stdout.write(formatstr %
2377 (ssl.get_protocol_name(client_protocol),
2378 ssl.get_protocol_name(server_protocol),
2379 certtype))
2380 client_context = ssl.SSLContext(client_protocol)
2381 client_context.options |= client_options
2382 server_context = ssl.SSLContext(server_protocol)
2383 server_context.options |= server_options
2384
2385 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2386 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2387 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002388 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002389 client_context.set_ciphers("ALL")
2390
2391 for ctx in (client_context, server_context):
2392 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002393 ctx.load_cert_chain(SIGNED_CERTFILE)
2394 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002395 try:
2396 stats = server_params_test(client_context, server_context,
2397 chatty=False, connectionchatty=False)
2398 # Protocol mismatch can result in either an SSLError, or a
2399 # "Connection reset by peer" error.
2400 except ssl.SSLError:
2401 if expect_success:
2402 raise
2403 except OSError as e:
2404 if expect_success or e.errno != errno.ECONNRESET:
2405 raise
2406 else:
2407 if not expect_success:
2408 raise AssertionError(
2409 "Client protocol %s succeeded with server protocol %s!"
2410 % (ssl.get_protocol_name(client_protocol),
2411 ssl.get_protocol_name(server_protocol)))
2412 elif (expect_success is not True
2413 and expect_success != stats['version']):
2414 raise AssertionError("version mismatch: expected %r, got %r"
2415 % (expect_success, stats['version']))
2416
2417
2418class ThreadedTests(unittest.TestCase):
2419
2420 @skip_if_broken_ubuntu_ssl
2421 def test_echo(self):
2422 """Basic test of an SSL client connecting to a server"""
2423 if support.verbose:
2424 sys.stdout.write("\n")
2425 for protocol in PROTOCOLS:
2426 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2427 continue
2428 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2429 context = ssl.SSLContext(protocol)
2430 context.load_cert_chain(CERTFILE)
2431 server_params_test(context, context,
2432 chatty=True, connectionchatty=True)
2433
Christian Heimesa170fa12017-09-15 20:27:30 +02002434 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002435
2436 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2437 server_params_test(client_context=client_context,
2438 server_context=server_context,
2439 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002440 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441
2442 client_context.check_hostname = False
2443 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2444 with self.assertRaises(ssl.SSLError) as e:
2445 server_params_test(client_context=server_context,
2446 server_context=client_context,
2447 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002448 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002449 self.assertIn('called a function you should not call',
2450 str(e.exception))
2451
2452 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2453 with self.assertRaises(ssl.SSLError) as e:
2454 server_params_test(client_context=server_context,
2455 server_context=server_context,
2456 chatty=True, connectionchatty=True)
2457 self.assertIn('called a function you should not call',
2458 str(e.exception))
2459
2460 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2461 with self.assertRaises(ssl.SSLError) as e:
2462 server_params_test(client_context=server_context,
2463 server_context=client_context,
2464 chatty=True, connectionchatty=True)
2465 self.assertIn('called a function you should not call',
2466 str(e.exception))
2467
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002468 def test_getpeercert(self):
2469 if support.verbose:
2470 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002471
2472 client_context, server_context, hostname = testing_context()
2473 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002474 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002475 with client_context.wrap_socket(socket.socket(),
2476 do_handshake_on_connect=False,
2477 server_hostname=hostname) as s:
2478 s.connect((HOST, server.port))
2479 # getpeercert() raise ValueError while the handshake isn't
2480 # done.
2481 with self.assertRaises(ValueError):
2482 s.getpeercert()
2483 s.do_handshake()
2484 cert = s.getpeercert()
2485 self.assertTrue(cert, "Can't get peer certificate.")
2486 cipher = s.cipher()
2487 if support.verbose:
2488 sys.stdout.write(pprint.pformat(cert) + '\n')
2489 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2490 if 'subject' not in cert:
2491 self.fail("No subject field in certificate: %s." %
2492 pprint.pformat(cert))
2493 if ((('organizationName', 'Python Software Foundation'),)
2494 not in cert['subject']):
2495 self.fail(
2496 "Missing or invalid 'organizationName' field in certificate subject; "
2497 "should be 'Python Software Foundation'.")
2498 self.assertIn('notBefore', cert)
2499 self.assertIn('notAfter', cert)
2500 before = ssl.cert_time_to_seconds(cert['notBefore'])
2501 after = ssl.cert_time_to_seconds(cert['notAfter'])
2502 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002503
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002504 @unittest.skipUnless(have_verify_flags(),
2505 "verify_flags need OpenSSL > 0.9.8")
2506 def test_crl_check(self):
2507 if support.verbose:
2508 sys.stdout.write("\n")
2509
Christian Heimesa170fa12017-09-15 20:27:30 +02002510 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002511
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002512 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002513 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002514
2515 # VERIFY_DEFAULT should pass
2516 server = ThreadedEchoServer(context=server_context, chatty=True)
2517 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002518 with client_context.wrap_socket(socket.socket(),
2519 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002520 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002521 cert = s.getpeercert()
2522 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002523
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002524 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002525 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002526
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 server = ThreadedEchoServer(context=server_context, chatty=True)
2528 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002529 with client_context.wrap_socket(socket.socket(),
2530 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002531 with self.assertRaisesRegex(ssl.SSLError,
2532 "certificate verify failed"):
2533 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002534
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002535 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002536 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002537
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002538 server = ThreadedEchoServer(context=server_context, chatty=True)
2539 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002540 with client_context.wrap_socket(socket.socket(),
2541 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002542 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002543 cert = s.getpeercert()
2544 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002545
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002546 def test_check_hostname(self):
2547 if support.verbose:
2548 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002549
Christian Heimesa170fa12017-09-15 20:27:30 +02002550 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002551
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002552 # correct hostname should verify
2553 server = ThreadedEchoServer(context=server_context, chatty=True)
2554 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002555 with client_context.wrap_socket(socket.socket(),
2556 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557 s.connect((HOST, server.port))
2558 cert = s.getpeercert()
2559 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002560
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002561 # incorrect hostname should raise an exception
2562 server = ThreadedEchoServer(context=server_context, chatty=True)
2563 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002564 with client_context.wrap_socket(socket.socket(),
2565 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002566 with self.assertRaisesRegex(
2567 ssl.CertificateError,
2568 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002569 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002570
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002571 # missing server_hostname arg should cause an exception, too
2572 server = ThreadedEchoServer(context=server_context, chatty=True)
2573 with server:
2574 with socket.socket() as s:
2575 with self.assertRaisesRegex(ValueError,
2576 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002577 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002578
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002579 def test_ecc_cert(self):
2580 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2581 client_context.load_verify_locations(SIGNING_CA)
2582 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2583 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2584
2585 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2586 # load ECC cert
2587 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2588
2589 # correct hostname should verify
2590 server = ThreadedEchoServer(context=server_context, chatty=True)
2591 with server:
2592 with client_context.wrap_socket(socket.socket(),
2593 server_hostname=hostname) as s:
2594 s.connect((HOST, server.port))
2595 cert = s.getpeercert()
2596 self.assertTrue(cert, "Can't get peer certificate.")
2597 cipher = s.cipher()[0].split('-')
2598 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2599
2600 def test_dual_rsa_ecc(self):
2601 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2602 client_context.load_verify_locations(SIGNING_CA)
2603 # only ECDSA certs
2604 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2605 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2606
2607 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2608 # load ECC and RSA key/cert pairs
2609 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2610 server_context.load_cert_chain(SIGNED_CERTFILE)
2611
2612 # correct hostname should verify
2613 server = ThreadedEchoServer(context=server_context, chatty=True)
2614 with server:
2615 with client_context.wrap_socket(socket.socket(),
2616 server_hostname=hostname) as s:
2617 s.connect((HOST, server.port))
2618 cert = s.getpeercert()
2619 self.assertTrue(cert, "Can't get peer certificate.")
2620 cipher = s.cipher()[0].split('-')
2621 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2622
Christian Heimes66e57422018-01-29 14:25:13 +01002623 def test_check_hostname_idn(self):
2624 if support.verbose:
2625 sys.stdout.write("\n")
2626
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002627 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002628 server_context.load_cert_chain(IDNSANSFILE)
2629
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002630 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002631 context.verify_mode = ssl.CERT_REQUIRED
2632 context.check_hostname = True
2633 context.load_verify_locations(SIGNING_CA)
2634
2635 # correct hostname should verify, when specified in several
2636 # different ways
2637 idn_hostnames = [
2638 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002639 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002640 ('xn--knig-5qa.idn.pythontest.net',
2641 'xn--knig-5qa.idn.pythontest.net'),
2642 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002643 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002644
2645 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002646 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002647 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2648 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2649 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002650 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2651
2652 # ('königsgäßchen.idna2008.pythontest.net',
2653 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2654 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2655 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2656 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2657 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2658
Christian Heimes66e57422018-01-29 14:25:13 +01002659 ]
2660 for server_hostname, expected_hostname in idn_hostnames:
2661 server = ThreadedEchoServer(context=server_context, chatty=True)
2662 with server:
2663 with context.wrap_socket(socket.socket(),
2664 server_hostname=server_hostname) as s:
2665 self.assertEqual(s.server_hostname, expected_hostname)
2666 s.connect((HOST, server.port))
2667 cert = s.getpeercert()
2668 self.assertEqual(s.server_hostname, expected_hostname)
2669 self.assertTrue(cert, "Can't get peer certificate.")
2670
2671 with ssl.SSLSocket(socket.socket(),
2672 server_hostname=server_hostname) as s:
2673 s.connect((HOST, server.port))
2674 s.getpeercert()
2675 self.assertEqual(s.server_hostname, expected_hostname)
2676
Christian Heimes66e57422018-01-29 14:25:13 +01002677 # incorrect hostname should raise an exception
2678 server = ThreadedEchoServer(context=server_context, chatty=True)
2679 with server:
2680 with context.wrap_socket(socket.socket(),
2681 server_hostname="python.example.org") as s:
2682 with self.assertRaises(ssl.CertificateError):
2683 s.connect((HOST, server.port))
2684
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002685 def test_wrong_cert(self):
2686 """Connecting when the server rejects the client's certificate
2687
2688 Launch a server with CERT_REQUIRED, and check that trying to
2689 connect to it with a wrong client certificate fails.
2690 """
2691 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2692 "wrongcert.pem")
2693 server = ThreadedEchoServer(CERTFILE,
2694 certreqs=ssl.CERT_REQUIRED,
2695 cacerts=CERTFILE, chatty=False,
2696 connectionchatty=False)
2697 with server, \
2698 socket.socket() as sock, \
Christian Heimesa170fa12017-09-15 20:27:30 +02002699 test_wrap_socket(sock, certfile=certfile) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002700 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002701 # Expect either an SSL error about the server rejecting
2702 # the connection, or a low-level connection reset (which
2703 # sometimes happens on Windows)
2704 s.connect((HOST, server.port))
2705 except ssl.SSLError as e:
2706 if support.verbose:
2707 sys.stdout.write("\nSSLError is %r\n" % e)
2708 except OSError as e:
2709 if e.errno != errno.ECONNRESET:
2710 raise
2711 if support.verbose:
2712 sys.stdout.write("\nsocket.error is %r\n" % e)
2713 else:
2714 self.fail("Use of invalid cert should have failed!")
2715
2716 def test_rude_shutdown(self):
2717 """A brutal shutdown of an SSL server should raise an OSError
2718 in the client when attempting handshake.
2719 """
2720 listener_ready = threading.Event()
2721 listener_gone = threading.Event()
2722
2723 s = socket.socket()
2724 port = support.bind_port(s, HOST)
2725
2726 # `listener` runs in a thread. It sits in an accept() until
2727 # the main thread connects. Then it rudely closes the socket,
2728 # and sets Event `listener_gone` to let the main thread know
2729 # the socket is gone.
2730 def listener():
2731 s.listen()
2732 listener_ready.set()
2733 newsock, addr = s.accept()
2734 newsock.close()
2735 s.close()
2736 listener_gone.set()
2737
2738 def connector():
2739 listener_ready.wait()
2740 with socket.socket() as c:
2741 c.connect((HOST, port))
2742 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002743 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002744 ssl_sock = test_wrap_socket(c)
2745 except OSError:
2746 pass
2747 else:
2748 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002749
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002750 t = threading.Thread(target=listener)
2751 t.start()
2752 try:
2753 connector()
2754 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002755 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002756
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002757 def test_ssl_cert_verify_error(self):
2758 if support.verbose:
2759 sys.stdout.write("\n")
2760
2761 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2762 server_context.load_cert_chain(SIGNED_CERTFILE)
2763
2764 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2765
2766 server = ThreadedEchoServer(context=server_context, chatty=True)
2767 with server:
2768 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002769 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002770 try:
2771 s.connect((HOST, server.port))
2772 except ssl.SSLError as e:
2773 msg = 'unable to get local issuer certificate'
2774 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2775 self.assertEqual(e.verify_code, 20)
2776 self.assertEqual(e.verify_message, msg)
2777 self.assertIn(msg, repr(e))
2778 self.assertIn('certificate verify failed', repr(e))
2779
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780 @skip_if_broken_ubuntu_ssl
2781 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2782 "OpenSSL is compiled without SSLv2 support")
2783 def test_protocol_sslv2(self):
2784 """Connecting to an SSLv2 server with various client options"""
2785 if support.verbose:
2786 sys.stdout.write("\n")
2787 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2788 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2789 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002790 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002791 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2792 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2793 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2794 # SSLv23 client with specific SSL options
2795 if no_sslv2_implies_sslv3_hello():
2796 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002797 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002798 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002799 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002800 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002801 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002802 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002803
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002804 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002805 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002806 """Connecting to an SSLv23 server with various client options"""
2807 if support.verbose:
2808 sys.stdout.write("\n")
2809 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002810 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002811 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002812 except OSError as x:
2813 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2814 if support.verbose:
2815 sys.stdout.write(
2816 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2817 % str(x))
2818 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002819 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2820 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2821 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002822
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002823 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002824 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2825 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2826 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002827
2828 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002829 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2830 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2831 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002832
2833 # Server with specific SSL options
2834 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002835 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002836 server_options=ssl.OP_NO_SSLv3)
2837 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002838 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002839 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002840 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002841 server_options=ssl.OP_NO_TLSv1)
2842
2843
2844 @skip_if_broken_ubuntu_ssl
2845 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2846 "OpenSSL is compiled without SSLv3 support")
2847 def test_protocol_sslv3(self):
2848 """Connecting to an SSLv3 server with various client options"""
2849 if support.verbose:
2850 sys.stdout.write("\n")
2851 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2852 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2853 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2854 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2855 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002856 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002857 client_options=ssl.OP_NO_SSLv3)
2858 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2859 if no_sslv2_implies_sslv3_hello():
2860 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002861 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002862 False, client_options=ssl.OP_NO_SSLv2)
2863
2864 @skip_if_broken_ubuntu_ssl
2865 def test_protocol_tlsv1(self):
2866 """Connecting to a TLSv1 server with various client options"""
2867 if support.verbose:
2868 sys.stdout.write("\n")
2869 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2870 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2871 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2872 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2873 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2874 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2875 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002876 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002877 client_options=ssl.OP_NO_TLSv1)
2878
2879 @skip_if_broken_ubuntu_ssl
2880 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2881 "TLS version 1.1 not supported.")
2882 def test_protocol_tlsv1_1(self):
2883 """Connecting to a TLSv1.1 server with various client options.
2884 Testing against older TLS versions."""
2885 if support.verbose:
2886 sys.stdout.write("\n")
2887 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2888 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2889 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2890 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2891 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002892 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002893 client_options=ssl.OP_NO_TLSv1_1)
2894
Christian Heimesa170fa12017-09-15 20:27:30 +02002895 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002896 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2897 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2898
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002899 @skip_if_broken_ubuntu_ssl
2900 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2901 "TLS version 1.2 not supported.")
2902 def test_protocol_tlsv1_2(self):
2903 """Connecting to a TLSv1.2 server with various client options.
2904 Testing against older TLS versions."""
2905 if support.verbose:
2906 sys.stdout.write("\n")
2907 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2908 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2909 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2910 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2911 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2912 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2913 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002914 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002915 client_options=ssl.OP_NO_TLSv1_2)
2916
Christian Heimesa170fa12017-09-15 20:27:30 +02002917 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002918 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2919 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2920 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2921 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2922
2923 def test_starttls(self):
2924 """Switching from clear text to encrypted and back again."""
2925 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2926
2927 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002928 starttls_server=True,
2929 chatty=True,
2930 connectionchatty=True)
2931 wrapped = False
2932 with server:
2933 s = socket.socket()
2934 s.setblocking(1)
2935 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002936 if support.verbose:
2937 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002938 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02002939 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002940 sys.stdout.write(
2941 " client: sending %r...\n" % indata)
2942 if wrapped:
2943 conn.write(indata)
2944 outdata = conn.read()
2945 else:
2946 s.send(indata)
2947 outdata = s.recv(1024)
2948 msg = outdata.strip().lower()
2949 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2950 # STARTTLS ok, switch to secure mode
2951 if support.verbose:
2952 sys.stdout.write(
2953 " client: read %r from server, starting TLS...\n"
2954 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02002955 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002956 wrapped = True
2957 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2958 # ENDTLS ok, switch back to clear text
2959 if support.verbose:
2960 sys.stdout.write(
2961 " client: read %r from server, ending TLS...\n"
2962 % msg)
2963 s = conn.unwrap()
2964 wrapped = False
2965 else:
2966 if support.verbose:
2967 sys.stdout.write(
2968 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002969 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002970 sys.stdout.write(" client: closing connection.\n")
2971 if wrapped:
2972 conn.write(b"over\n")
2973 else:
2974 s.send(b"over\n")
2975 if wrapped:
2976 conn.close()
2977 else:
2978 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002979
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002980 def test_socketserver(self):
2981 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002982 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002983 # try to connect
2984 if support.verbose:
2985 sys.stdout.write('\n')
2986 with open(CERTFILE, 'rb') as f:
2987 d1 = f.read()
2988 d2 = ''
2989 # now fetch the same data from the HTTPS server
2990 url = 'https://localhost:%d/%s' % (
2991 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002992 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002993 f = urllib.request.urlopen(url, context=context)
2994 try:
2995 dlen = f.info().get("content-length")
2996 if dlen and (int(dlen) > 0):
2997 d2 = f.read(int(dlen))
2998 if support.verbose:
2999 sys.stdout.write(
3000 " client: read %d bytes from remote server '%s'\n"
3001 % (len(d2), server))
3002 finally:
3003 f.close()
3004 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003005
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003006 def test_asyncore_server(self):
3007 """Check the example asyncore integration."""
3008 if support.verbose:
3009 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003010
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003011 indata = b"FOO\n"
3012 server = AsyncoreEchoServer(CERTFILE)
3013 with server:
3014 s = test_wrap_socket(socket.socket())
3015 s.connect(('127.0.0.1', server.port))
3016 if support.verbose:
3017 sys.stdout.write(
3018 " client: sending %r...\n" % indata)
3019 s.write(indata)
3020 outdata = s.read()
3021 if support.verbose:
3022 sys.stdout.write(" client: read %r\n" % outdata)
3023 if outdata != indata.lower():
3024 self.fail(
3025 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3026 % (outdata[:20], len(outdata),
3027 indata[:20].lower(), len(indata)))
3028 s.write(b"over\n")
3029 if support.verbose:
3030 sys.stdout.write(" client: closing connection.\n")
3031 s.close()
3032 if support.verbose:
3033 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003034
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003035 def test_recv_send(self):
3036 """Test recv(), send() and friends."""
3037 if support.verbose:
3038 sys.stdout.write("\n")
3039
3040 server = ThreadedEchoServer(CERTFILE,
3041 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003042 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003043 cacerts=CERTFILE,
3044 chatty=True,
3045 connectionchatty=False)
3046 with server:
3047 s = test_wrap_socket(socket.socket(),
3048 server_side=False,
3049 certfile=CERTFILE,
3050 ca_certs=CERTFILE,
3051 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003052 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003053 s.connect((HOST, server.port))
3054 # helper methods for standardising recv* method signatures
3055 def _recv_into():
3056 b = bytearray(b"\0"*100)
3057 count = s.recv_into(b)
3058 return b[:count]
3059
3060 def _recvfrom_into():
3061 b = bytearray(b"\0"*100)
3062 count, addr = s.recvfrom_into(b)
3063 return b[:count]
3064
3065 # (name, method, expect success?, *args, return value func)
3066 send_methods = [
3067 ('send', s.send, True, [], len),
3068 ('sendto', s.sendto, False, ["some.address"], len),
3069 ('sendall', s.sendall, True, [], lambda x: None),
3070 ]
3071 # (name, method, whether to expect success, *args)
3072 recv_methods = [
3073 ('recv', s.recv, True, []),
3074 ('recvfrom', s.recvfrom, False, ["some.address"]),
3075 ('recv_into', _recv_into, True, []),
3076 ('recvfrom_into', _recvfrom_into, False, []),
3077 ]
3078 data_prefix = "PREFIX_"
3079
3080 for (meth_name, send_meth, expect_success, args,
3081 ret_val_meth) in send_methods:
3082 indata = (data_prefix + meth_name).encode('ascii')
3083 try:
3084 ret = send_meth(indata, *args)
3085 msg = "sending with {}".format(meth_name)
3086 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3087 outdata = s.read()
3088 if outdata != indata.lower():
3089 self.fail(
3090 "While sending with <<{name:s}>> bad data "
3091 "<<{outdata:r}>> ({nout:d}) received; "
3092 "expected <<{indata:r}>> ({nin:d})\n".format(
3093 name=meth_name, outdata=outdata[:20],
3094 nout=len(outdata),
3095 indata=indata[:20], nin=len(indata)
3096 )
3097 )
3098 except ValueError as e:
3099 if expect_success:
3100 self.fail(
3101 "Failed to send with method <<{name:s}>>; "
3102 "expected to succeed.\n".format(name=meth_name)
3103 )
3104 if not str(e).startswith(meth_name):
3105 self.fail(
3106 "Method <<{name:s}>> failed with unexpected "
3107 "exception message: {exp:s}\n".format(
3108 name=meth_name, exp=e
3109 )
3110 )
3111
3112 for meth_name, recv_meth, expect_success, args in recv_methods:
3113 indata = (data_prefix + meth_name).encode('ascii')
3114 try:
3115 s.send(indata)
3116 outdata = recv_meth(*args)
3117 if outdata != indata.lower():
3118 self.fail(
3119 "While receiving with <<{name:s}>> bad data "
3120 "<<{outdata:r}>> ({nout:d}) received; "
3121 "expected <<{indata:r}>> ({nin:d})\n".format(
3122 name=meth_name, outdata=outdata[:20],
3123 nout=len(outdata),
3124 indata=indata[:20], nin=len(indata)
3125 )
3126 )
3127 except ValueError as e:
3128 if expect_success:
3129 self.fail(
3130 "Failed to receive with method <<{name:s}>>; "
3131 "expected to succeed.\n".format(name=meth_name)
3132 )
3133 if not str(e).startswith(meth_name):
3134 self.fail(
3135 "Method <<{name:s}>> failed with unexpected "
3136 "exception message: {exp:s}\n".format(
3137 name=meth_name, exp=e
3138 )
3139 )
3140 # consume data
3141 s.read()
3142
3143 # read(-1, buffer) is supported, even though read(-1) is not
3144 data = b"data"
3145 s.send(data)
3146 buffer = bytearray(len(data))
3147 self.assertEqual(s.read(-1, buffer), len(data))
3148 self.assertEqual(buffer, data)
3149
Christian Heimes888bbdc2017-09-07 14:18:21 -07003150 # sendall accepts bytes-like objects
3151 if ctypes is not None:
3152 ubyte = ctypes.c_ubyte * len(data)
3153 byteslike = ubyte.from_buffer_copy(data)
3154 s.sendall(byteslike)
3155 self.assertEqual(s.read(), data)
3156
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003157 # Make sure sendmsg et al are disallowed to avoid
3158 # inadvertent disclosure of data and/or corruption
3159 # of the encrypted data stream
3160 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3161 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3162 self.assertRaises(NotImplementedError,
3163 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003164 s.write(b"over\n")
3165
3166 self.assertRaises(ValueError, s.recv, -1)
3167 self.assertRaises(ValueError, s.read, -1)
3168
3169 s.close()
3170
3171 def test_recv_zero(self):
3172 server = ThreadedEchoServer(CERTFILE)
3173 server.__enter__()
3174 self.addCleanup(server.__exit__, None, None)
3175 s = socket.create_connection((HOST, server.port))
3176 self.addCleanup(s.close)
3177 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3178 self.addCleanup(s.close)
3179
3180 # recv/read(0) should return no data
3181 s.send(b"data")
3182 self.assertEqual(s.recv(0), b"")
3183 self.assertEqual(s.read(0), b"")
3184 self.assertEqual(s.read(), b"data")
3185
3186 # Should not block if the other end sends no data
3187 s.setblocking(False)
3188 self.assertEqual(s.recv(0), b"")
3189 self.assertEqual(s.recv_into(bytearray()), 0)
3190
3191 def test_nonblocking_send(self):
3192 server = ThreadedEchoServer(CERTFILE,
3193 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003194 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003195 cacerts=CERTFILE,
3196 chatty=True,
3197 connectionchatty=False)
3198 with server:
3199 s = test_wrap_socket(socket.socket(),
3200 server_side=False,
3201 certfile=CERTFILE,
3202 ca_certs=CERTFILE,
3203 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003204 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003205 s.connect((HOST, server.port))
3206 s.setblocking(False)
3207
3208 # If we keep sending data, at some point the buffers
3209 # will be full and the call will block
3210 buf = bytearray(8192)
3211 def fill_buffer():
3212 while True:
3213 s.send(buf)
3214 self.assertRaises((ssl.SSLWantWriteError,
3215 ssl.SSLWantReadError), fill_buffer)
3216
3217 # Now read all the output and discard it
3218 s.setblocking(True)
3219 s.close()
3220
3221 def test_handshake_timeout(self):
3222 # Issue #5103: SSL handshake must respect the socket timeout
3223 server = socket.socket(socket.AF_INET)
3224 host = "127.0.0.1"
3225 port = support.bind_port(server)
3226 started = threading.Event()
3227 finish = False
3228
3229 def serve():
3230 server.listen()
3231 started.set()
3232 conns = []
3233 while not finish:
3234 r, w, e = select.select([server], [], [], 0.1)
3235 if server in r:
3236 # Let the socket hang around rather than having
3237 # it closed by garbage collection.
3238 conns.append(server.accept()[0])
3239 for sock in conns:
3240 sock.close()
3241
3242 t = threading.Thread(target=serve)
3243 t.start()
3244 started.wait()
3245
3246 try:
3247 try:
3248 c = socket.socket(socket.AF_INET)
3249 c.settimeout(0.2)
3250 c.connect((host, port))
3251 # Will attempt handshake and time out
3252 self.assertRaisesRegex(socket.timeout, "timed out",
3253 test_wrap_socket, c)
3254 finally:
3255 c.close()
3256 try:
3257 c = socket.socket(socket.AF_INET)
3258 c = test_wrap_socket(c)
3259 c.settimeout(0.2)
3260 # Will attempt handshake and time out
3261 self.assertRaisesRegex(socket.timeout, "timed out",
3262 c.connect, (host, port))
3263 finally:
3264 c.close()
3265 finally:
3266 finish = True
3267 t.join()
3268 server.close()
3269
3270 def test_server_accept(self):
3271 # Issue #16357: accept() on a SSLSocket created through
3272 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003273 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003274 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003275 context.load_verify_locations(SIGNING_CA)
3276 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003277 server = socket.socket(socket.AF_INET)
3278 host = "127.0.0.1"
3279 port = support.bind_port(server)
3280 server = context.wrap_socket(server, server_side=True)
3281 self.assertTrue(server.server_side)
3282
3283 evt = threading.Event()
3284 remote = None
3285 peer = None
3286 def serve():
3287 nonlocal remote, peer
3288 server.listen()
3289 # Block on the accept and wait on the connection to close.
3290 evt.set()
3291 remote, peer = server.accept()
3292 remote.recv(1)
3293
3294 t = threading.Thread(target=serve)
3295 t.start()
3296 # Client wait until server setup and perform a connect.
3297 evt.wait()
3298 client = context.wrap_socket(socket.socket())
3299 client.connect((host, port))
3300 client_addr = client.getsockname()
3301 client.close()
3302 t.join()
3303 remote.close()
3304 server.close()
3305 # Sanity checks.
3306 self.assertIsInstance(remote, ssl.SSLSocket)
3307 self.assertEqual(peer, client_addr)
3308
3309 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003310 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003311 with context.wrap_socket(socket.socket()) as sock:
3312 with self.assertRaises(OSError) as cm:
3313 sock.getpeercert()
3314 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3315
3316 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003317 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003318 with context.wrap_socket(socket.socket()) as sock:
3319 with self.assertRaises(OSError) as cm:
3320 sock.do_handshake()
3321 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3322
3323 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003324 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003325 try:
3326 # Force a set of weak ciphers on our client context
3327 context.set_ciphers("DES")
3328 except ssl.SSLError:
3329 self.skipTest("no DES cipher available")
3330 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003331 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003332 chatty=False) as server:
3333 with context.wrap_socket(socket.socket()) as s:
3334 with self.assertRaises(OSError):
3335 s.connect((HOST, server.port))
3336 self.assertIn("no shared cipher", server.conn_errors[0])
3337
3338 def test_version_basic(self):
3339 """
3340 Basic tests for SSLSocket.version().
3341 More tests are done in the test_protocol_*() methods.
3342 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003343 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3344 context.check_hostname = False
3345 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003346 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003347 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003348 chatty=False) as server:
3349 with context.wrap_socket(socket.socket()) as s:
3350 self.assertIs(s.version(), None)
3351 s.connect((HOST, server.port))
Christian Heimesa170fa12017-09-15 20:27:30 +02003352 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3353 self.assertEqual(s.version(), 'TLSv1.2')
3354 else: # 0.9.8 to 1.0.1
3355 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003356 self.assertIs(s.version(), None)
3357
Christian Heimescb5b68a2017-09-07 18:07:00 -07003358 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3359 "test requires TLSv1.3 enabled OpenSSL")
3360 def test_tls1_3(self):
3361 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3362 context.load_cert_chain(CERTFILE)
3363 # disable all but TLS 1.3
3364 context.options |= (
3365 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3366 )
3367 with ThreadedEchoServer(context=context) as server:
3368 with context.wrap_socket(socket.socket()) as s:
3369 s.connect((HOST, server.port))
3370 self.assertIn(s.cipher()[0], [
3371 'TLS13-AES-256-GCM-SHA384',
3372 'TLS13-CHACHA20-POLY1305-SHA256',
3373 'TLS13-AES-128-GCM-SHA256',
3374 ])
3375
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003376 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3377 def test_default_ecdh_curve(self):
3378 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3379 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003380 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003381 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003382 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3383 # cipher name.
3384 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003385 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3386 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3387 # our default cipher list should prefer ECDH-based ciphers
3388 # automatically.
3389 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3390 context.set_ciphers("ECCdraft:ECDH")
3391 with ThreadedEchoServer(context=context) as server:
3392 with context.wrap_socket(socket.socket()) as s:
3393 s.connect((HOST, server.port))
3394 self.assertIn("ECDH", s.cipher()[0])
3395
3396 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3397 "'tls-unique' channel binding not available")
3398 def test_tls_unique_channel_binding(self):
3399 """Test tls-unique channel binding."""
3400 if support.verbose:
3401 sys.stdout.write("\n")
3402
3403 server = ThreadedEchoServer(CERTFILE,
3404 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003405 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003406 cacerts=CERTFILE,
3407 chatty=True,
3408 connectionchatty=False)
3409 with server:
3410 s = test_wrap_socket(socket.socket(),
3411 server_side=False,
3412 certfile=CERTFILE,
3413 ca_certs=CERTFILE,
3414 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003415 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003416 s.connect((HOST, server.port))
3417 # get the data
3418 cb_data = s.get_channel_binding("tls-unique")
3419 if support.verbose:
3420 sys.stdout.write(" got channel binding data: {0!r}\n"
3421 .format(cb_data))
3422
3423 # check if it is sane
3424 self.assertIsNotNone(cb_data)
3425 self.assertEqual(len(cb_data), 12) # True for TLSv1
3426
3427 # and compare with the peers version
3428 s.write(b"CB tls-unique\n")
3429 peer_data_repr = s.read().strip()
3430 self.assertEqual(peer_data_repr,
3431 repr(cb_data).encode("us-ascii"))
3432 s.close()
3433
3434 # now, again
3435 s = test_wrap_socket(socket.socket(),
3436 server_side=False,
3437 certfile=CERTFILE,
3438 ca_certs=CERTFILE,
3439 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003440 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003441 s.connect((HOST, server.port))
3442 new_cb_data = s.get_channel_binding("tls-unique")
3443 if support.verbose:
3444 sys.stdout.write(" got another channel binding data: {0!r}\n"
3445 .format(new_cb_data))
3446 # is it really unique
3447 self.assertNotEqual(cb_data, new_cb_data)
3448 self.assertIsNotNone(cb_data)
3449 self.assertEqual(len(cb_data), 12) # True for TLSv1
3450 s.write(b"CB tls-unique\n")
3451 peer_data_repr = s.read().strip()
3452 self.assertEqual(peer_data_repr,
3453 repr(new_cb_data).encode("us-ascii"))
3454 s.close()
3455
3456 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003457 client_context, server_context, hostname = testing_context()
3458 stats = server_params_test(client_context, server_context,
3459 chatty=True, connectionchatty=True,
3460 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003461 if support.verbose:
3462 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3463 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3464
3465 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3466 "ssl.OP_NO_COMPRESSION needed for this test")
3467 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003468 client_context, server_context, hostname = testing_context()
3469 client_context.options |= ssl.OP_NO_COMPRESSION
3470 server_context.options |= ssl.OP_NO_COMPRESSION
3471 stats = server_params_test(client_context, server_context,
3472 chatty=True, connectionchatty=True,
3473 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003474 self.assertIs(stats['compression'], None)
3475
3476 def test_dh_params(self):
3477 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003478 client_context, server_context, hostname = testing_context()
3479 server_context.load_dh_params(DHFILE)
3480 server_context.set_ciphers("kEDH")
3481 stats = server_params_test(client_context, server_context,
3482 chatty=True, connectionchatty=True,
3483 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003484 cipher = stats["cipher"][0]
3485 parts = cipher.split("-")
3486 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3487 self.fail("Non-DH cipher: " + cipher[0])
3488
3489 def test_selected_alpn_protocol(self):
3490 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003491 client_context, server_context, hostname = testing_context()
3492 stats = server_params_test(client_context, server_context,
3493 chatty=True, connectionchatty=True,
3494 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003495 self.assertIs(stats['client_alpn_protocol'], None)
3496
3497 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3498 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3499 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003500 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003501 server_context.set_alpn_protocols(['foo', 'bar'])
3502 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003503 chatty=True, connectionchatty=True,
3504 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003505 self.assertIs(stats['client_alpn_protocol'], None)
3506
3507 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3508 def test_alpn_protocols(self):
3509 server_protocols = ['foo', 'bar', 'milkshake']
3510 protocol_tests = [
3511 (['foo', 'bar'], 'foo'),
3512 (['bar', 'foo'], 'foo'),
3513 (['milkshake'], 'milkshake'),
3514 (['http/3.0', 'http/4.0'], None)
3515 ]
3516 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003517 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003518 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003519 client_context.set_alpn_protocols(client_protocols)
3520
3521 try:
3522 stats = server_params_test(client_context,
3523 server_context,
3524 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003525 connectionchatty=True,
3526 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003527 except ssl.SSLError as e:
3528 stats = e
3529
3530 if (expected is None and IS_OPENSSL_1_1
3531 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3532 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3533 self.assertIsInstance(stats, ssl.SSLError)
3534 else:
3535 msg = "failed trying %s (s) and %s (c).\n" \
3536 "was expecting %s, but got %%s from the %%s" \
3537 % (str(server_protocols), str(client_protocols),
3538 str(expected))
3539 client_result = stats['client_alpn_protocol']
3540 self.assertEqual(client_result, expected,
3541 msg % (client_result, "client"))
3542 server_result = stats['server_alpn_protocols'][-1] \
3543 if len(stats['server_alpn_protocols']) else 'nothing'
3544 self.assertEqual(server_result, expected,
3545 msg % (server_result, "server"))
3546
3547 def test_selected_npn_protocol(self):
3548 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003549 client_context, server_context, hostname = testing_context()
3550 stats = server_params_test(client_context, server_context,
3551 chatty=True, connectionchatty=True,
3552 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003553 self.assertIs(stats['client_npn_protocol'], None)
3554
3555 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3556 def test_npn_protocols(self):
3557 server_protocols = ['http/1.1', 'spdy/2']
3558 protocol_tests = [
3559 (['http/1.1', 'spdy/2'], 'http/1.1'),
3560 (['spdy/2', 'http/1.1'], 'http/1.1'),
3561 (['spdy/2', 'test'], 'spdy/2'),
3562 (['abc', 'def'], 'abc')
3563 ]
3564 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003565 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003566 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003567 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003568 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003569 chatty=True, connectionchatty=True,
3570 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003571 msg = "failed trying %s (s) and %s (c).\n" \
3572 "was expecting %s, but got %%s from the %%s" \
3573 % (str(server_protocols), str(client_protocols),
3574 str(expected))
3575 client_result = stats['client_npn_protocol']
3576 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3577 server_result = stats['server_npn_protocols'][-1] \
3578 if len(stats['server_npn_protocols']) else 'nothing'
3579 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3580
3581 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003582 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003583 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003584 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003585 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003586 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003587 client_context.load_verify_locations(SIGNING_CA)
3588 return server_context, other_context, client_context
3589
3590 def check_common_name(self, stats, name):
3591 cert = stats['peercert']
3592 self.assertIn((('commonName', name),), cert['subject'])
3593
3594 @needs_sni
3595 def test_sni_callback(self):
3596 calls = []
3597 server_context, other_context, client_context = self.sni_contexts()
3598
Christian Heimesa170fa12017-09-15 20:27:30 +02003599 client_context.check_hostname = False
3600
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003601 def servername_cb(ssl_sock, server_name, initial_context):
3602 calls.append((server_name, initial_context))
3603 if server_name is not None:
3604 ssl_sock.context = other_context
3605 server_context.set_servername_callback(servername_cb)
3606
3607 stats = server_params_test(client_context, server_context,
3608 chatty=True,
3609 sni_name='supermessage')
3610 # The hostname was fetched properly, and the certificate was
3611 # changed for the connection.
3612 self.assertEqual(calls, [("supermessage", server_context)])
3613 # CERTFILE4 was selected
3614 self.check_common_name(stats, 'fakehostname')
3615
3616 calls = []
3617 # The callback is called with server_name=None
3618 stats = server_params_test(client_context, server_context,
3619 chatty=True,
3620 sni_name=None)
3621 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003622 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003623
3624 # Check disabling the callback
3625 calls = []
3626 server_context.set_servername_callback(None)
3627
3628 stats = server_params_test(client_context, server_context,
3629 chatty=True,
3630 sni_name='notfunny')
3631 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003632 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003633 self.assertEqual(calls, [])
3634
3635 @needs_sni
3636 def test_sni_callback_alert(self):
3637 # Returning a TLS alert is reflected to the connecting client
3638 server_context, other_context, client_context = self.sni_contexts()
3639
3640 def cb_returning_alert(ssl_sock, server_name, initial_context):
3641 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3642 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003643 with self.assertRaises(ssl.SSLError) as cm:
3644 stats = server_params_test(client_context, server_context,
3645 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003646 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003647 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003648
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003649 @needs_sni
3650 def test_sni_callback_raising(self):
3651 # Raising fails the connection with a TLS handshake failure alert.
3652 server_context, other_context, client_context = self.sni_contexts()
3653
3654 def cb_raising(ssl_sock, server_name, initial_context):
3655 1/0
3656 server_context.set_servername_callback(cb_raising)
3657
3658 with self.assertRaises(ssl.SSLError) as cm, \
3659 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003660 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003661 chatty=False,
3662 sni_name='supermessage')
3663 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3664 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003665
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003666 @needs_sni
3667 def test_sni_callback_wrong_return_type(self):
3668 # Returning the wrong return type terminates the TLS connection
3669 # with an internal error alert.
3670 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003671
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003672 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3673 return "foo"
3674 server_context.set_servername_callback(cb_wrong_return_type)
3675
3676 with self.assertRaises(ssl.SSLError) as cm, \
3677 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003678 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003679 chatty=False,
3680 sni_name='supermessage')
3681 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3682 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003683
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003684 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003685 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003686 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3687 client_context.set_ciphers("AES128:AES256")
3688 server_context.set_ciphers("AES256")
3689 alg1 = "AES256"
3690 alg2 = "AES-256"
3691 else:
3692 client_context.set_ciphers("AES:3DES")
3693 server_context.set_ciphers("3DES")
3694 alg1 = "3DES"
3695 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003696
Christian Heimesa170fa12017-09-15 20:27:30 +02003697 stats = server_params_test(client_context, server_context,
3698 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003699 ciphers = stats['server_shared_ciphers'][0]
3700 self.assertGreater(len(ciphers), 0)
3701 for name, tls_version, bits in ciphers:
3702 if not alg1 in name.split("-") and alg2 not in name:
3703 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003704
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003705 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003706 client_context, server_context, hostname = testing_context()
3707 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003708
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003709 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003710 s = client_context.wrap_socket(socket.socket(),
3711 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003712 s.connect((HOST, server.port))
3713 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003714
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003715 self.assertRaises(ValueError, s.read, 1024)
3716 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003717
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003718 def test_sendfile(self):
3719 TEST_DATA = b"x" * 512
3720 with open(support.TESTFN, 'wb') as f:
3721 f.write(TEST_DATA)
3722 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003723 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003724 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003725 context.load_verify_locations(SIGNING_CA)
3726 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003727 server = ThreadedEchoServer(context=context, chatty=False)
3728 with server:
3729 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003730 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003731 with open(support.TESTFN, 'rb') as file:
3732 s.sendfile(file)
3733 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003734
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003735 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003736 client_context, server_context, hostname = testing_context()
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003737
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003738 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003739 stats = server_params_test(client_context, server_context,
3740 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003741 session = stats['session']
3742 self.assertTrue(session.id)
3743 self.assertGreater(session.time, 0)
3744 self.assertGreater(session.timeout, 0)
3745 self.assertTrue(session.has_ticket)
3746 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3747 self.assertGreater(session.ticket_lifetime_hint, 0)
3748 self.assertFalse(stats['session_reused'])
3749 sess_stat = server_context.session_stats()
3750 self.assertEqual(sess_stat['accept'], 1)
3751 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003752
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003753 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003754 stats = server_params_test(client_context, server_context,
3755 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003756 sess_stat = server_context.session_stats()
3757 self.assertEqual(sess_stat['accept'], 2)
3758 self.assertEqual(sess_stat['hits'], 1)
3759 self.assertTrue(stats['session_reused'])
3760 session2 = stats['session']
3761 self.assertEqual(session2.id, session.id)
3762 self.assertEqual(session2, session)
3763 self.assertIsNot(session2, session)
3764 self.assertGreaterEqual(session2.time, session.time)
3765 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003766
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003767 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003768 stats = server_params_test(client_context, server_context,
3769 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003770 self.assertFalse(stats['session_reused'])
3771 session3 = stats['session']
3772 self.assertNotEqual(session3.id, session.id)
3773 self.assertNotEqual(session3, session)
3774 sess_stat = server_context.session_stats()
3775 self.assertEqual(sess_stat['accept'], 3)
3776 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003777
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003778 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003779 stats = server_params_test(client_context, server_context,
3780 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003781 self.assertTrue(stats['session_reused'])
3782 session4 = stats['session']
3783 self.assertEqual(session4.id, session.id)
3784 self.assertEqual(session4, session)
3785 self.assertGreaterEqual(session4.time, session.time)
3786 self.assertGreaterEqual(session4.timeout, session.timeout)
3787 sess_stat = server_context.session_stats()
3788 self.assertEqual(sess_stat['accept'], 4)
3789 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003790
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003791 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003792 client_context, server_context, hostname = testing_context()
3793 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02003794
Christian Heimescb5b68a2017-09-07 18:07:00 -07003795 # TODO: session reuse does not work with TLS 1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02003796 client_context.options |= ssl.OP_NO_TLSv1_3
3797 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07003798
Christian Heimesa170fa12017-09-15 20:27:30 +02003799 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003800 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003801 with client_context.wrap_socket(socket.socket(),
3802 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003803 # session is None before handshake
3804 self.assertEqual(s.session, None)
3805 self.assertEqual(s.session_reused, None)
3806 s.connect((HOST, server.port))
3807 session = s.session
3808 self.assertTrue(session)
3809 with self.assertRaises(TypeError) as e:
3810 s.session = object
3811 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003812
Christian Heimesa170fa12017-09-15 20:27:30 +02003813 with client_context.wrap_socket(socket.socket(),
3814 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003815 s.connect((HOST, server.port))
3816 # cannot set session after handshake
3817 with self.assertRaises(ValueError) as e:
3818 s.session = session
3819 self.assertEqual(str(e.exception),
3820 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003821
Christian Heimesa170fa12017-09-15 20:27:30 +02003822 with client_context.wrap_socket(socket.socket(),
3823 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003824 # can set session before handshake and before the
3825 # connection was established
3826 s.session = session
3827 s.connect((HOST, server.port))
3828 self.assertEqual(s.session.id, session.id)
3829 self.assertEqual(s.session, session)
3830 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003831
Christian Heimesa170fa12017-09-15 20:27:30 +02003832 with client_context2.wrap_socket(socket.socket(),
3833 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003834 # cannot re-use session with a different SSLContext
3835 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003836 s.session = session
3837 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003838 self.assertEqual(str(e.exception),
3839 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003840
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003841
Thomas Woutersed03b412007-08-28 21:37:11 +00003842def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003843 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003844 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003845 plats = {
3846 'Linux': platform.linux_distribution,
3847 'Mac': platform.mac_ver,
3848 'Windows': platform.win32_ver,
3849 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003850 with warnings.catch_warnings():
3851 warnings.filterwarnings(
3852 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003853 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003854 'functions are deprecated .*',
3855 PendingDeprecationWarning,
3856 )
3857 for name, func in plats.items():
3858 plat = func()
3859 if plat and plat[0]:
3860 plat = '%s %r' % (name, plat)
3861 break
3862 else:
3863 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003864 print("test_ssl: testing with %r %r" %
3865 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3866 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003867 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003868 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3869 try:
3870 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3871 except AttributeError:
3872 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003873
Antoine Pitrou152efa22010-05-16 18:19:27 +00003874 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003875 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003876 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003877 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003878 BADCERT, BADKEY, EMPTYCERT]:
3879 if not os.path.exists(filename):
3880 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003881
Martin Panter3840b2a2016-03-27 01:53:46 +00003882 tests = [
3883 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003884 SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00003885 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003886
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003887 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003888 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003889
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003890 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00003891 try:
3892 support.run_unittest(*tests)
3893 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003894 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003895
3896if __name__ == "__main__":
3897 test_main()