blob: 3b34fc0e0033014dddd1b44efbd68d5df57637e9 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
33IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Christian Heimesefff7062013-11-21 03:35:02 +010036def data_file(*name):
37 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Antoine Pitrou81564092010-10-08 23:06:24 +000039# The custom key and certificate files used in test_ssl are generated
40# using Lib/test/make_ssl_certs.py.
41# Other certificates are simply fetched from the Internet servers they
42# are meant to authenticate.
43
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000046ONLYCERT = data_file("ssl_cert.pem")
47ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000048BYTES_ONLYCERT = os.fsencode(ONLYCERT)
49BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020050CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000053CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000054BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010055CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56CAFILE_CACERT = data_file("capath", "5ed36f99.0")
57
Christian Heimesbd5c7d22018-01-20 15:16:30 +010058CERTFILE_INFO = {
59 'issuer': ((('countryName', 'XY'),),
60 (('localityName', 'Castle Anthrax'),),
61 (('organizationName', 'Python Software Foundation'),),
62 (('commonName', 'localhost'),)),
63 'notAfter': 'Jan 17 19:09:06 2028 GMT',
64 'notBefore': 'Jan 19 19:09:06 2018 GMT',
65 'serialNumber': 'F9BA076D5B6ABD9B',
66 'subject': ((('countryName', 'XY'),),
67 (('localityName', 'Castle Anthrax'),),
68 (('organizationName', 'Python Software Foundation'),),
69 (('commonName', 'localhost'),)),
70 'subjectAltName': (('DNS', 'localhost'),),
71 'version': 3
72}
Antoine Pitrou152efa22010-05-16 18:19:27 +000073
Christian Heimes22587792013-11-21 23:56:13 +010074# empty CRL
75CRLFILE = data_file("revocation.crl")
76
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010077# Two keys and certs signed by the same CA (for SNI tests)
78SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020079SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010080
81SIGNED_CERTFILE_INFO = {
82 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
83 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
84 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
85 'issuer': ((('countryName', 'XY'),),
86 (('organizationName', 'Python Software Foundation CA'),),
87 (('commonName', 'our-ca-server'),)),
88 'notAfter': 'Nov 28 19:09:06 2027 GMT',
89 'notBefore': 'Jan 19 19:09:06 2018 GMT',
90 'serialNumber': '82EDBF41C880919C',
91 'subject': ((('countryName', 'XY'),),
92 (('localityName', 'Castle Anthrax'),),
93 (('organizationName', 'Python Software Foundation'),),
94 (('commonName', 'localhost'),)),
95 'subjectAltName': (('DNS', 'localhost'),),
96 'version': 3
97}
98
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010099SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
102SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
103
Martin Panter3840b2a2016-03-27 01:53:46 +0000104# Same certificate as pycacert.pem, but without extra text in file
105SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200106# cert with all kinds of subject alt names
107ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100108IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100109
Martin Panter3d81d932016-01-14 09:36:00 +0000110REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000111
112EMPTYCERT = data_file("nullcert.pem")
113BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000114NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000115BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200116NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200117NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000118
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400119DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100120BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000121
Christian Heimes358cfd42016-09-10 22:43:48 +0200122# Not defined in all versions of OpenSSL
123OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
124OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
125OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
126OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
127
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100128
Thomas Woutersed03b412007-08-28 21:37:11 +0000129def handle_error(prefix):
130 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000131 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000132 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000133
Antoine Pitroub5218772010-05-21 09:56:06 +0000134def can_clear_options():
135 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200136 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000137
138def no_sslv2_implies_sslv3_hello():
139 # 0.9.7h or higher
140 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
141
Christian Heimes2427b502013-11-23 11:24:32 +0100142def have_verify_flags():
143 # 0.9.8 or higher
144 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
145
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800146def _have_secp_curves():
147 if not ssl.HAS_ECDH:
148 return False
149 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
150 try:
151 ctx.set_ecdh_curve("secp384r1")
152 except ValueError:
153 return False
154 else:
155 return True
156
157
158HAVE_SECP_CURVES = _have_secp_curves()
159
160
Antoine Pitrouc695c952014-04-28 20:57:36 +0200161def utc_offset(): #NOTE: ignore issues like #1647654
162 # local time = utc time + utc offset
163 if time.daylight and time.localtime().tm_isdst > 0:
164 return -time.altzone # seconds
165 return -time.timezone
166
Christian Heimes9424bb42013-06-17 15:32:57 +0200167def asn1time(cert_time):
168 # Some versions of OpenSSL ignore seconds, see #18207
169 # 0.9.8.i
170 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
171 fmt = "%b %d %H:%M:%S %Y GMT"
172 dt = datetime.datetime.strptime(cert_time, fmt)
173 dt = dt.replace(second=0)
174 cert_time = dt.strftime(fmt)
175 # %d adds leading zero but ASN1_TIME_print() uses leading space
176 if cert_time[4] == "0":
177 cert_time = cert_time[:4] + " " + cert_time[5:]
178
179 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000180
Antoine Pitrou23df4832010-08-04 17:14:06 +0000181# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
182def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200183 if hasattr(ssl, 'PROTOCOL_SSLv2'):
184 @functools.wraps(func)
185 def f(*args, **kwargs):
186 try:
187 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
188 except ssl.SSLError:
189 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
190 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
191 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
192 return func(*args, **kwargs)
193 return f
194 else:
195 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000196
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100197needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
198
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Christian Heimesd0486372016-09-10 23:23:33 +0200200def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
201 cert_reqs=ssl.CERT_NONE, ca_certs=None,
202 ciphers=None, certfile=None, keyfile=None,
203 **kwargs):
204 context = ssl.SSLContext(ssl_version)
205 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200206 if cert_reqs == ssl.CERT_NONE:
207 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200208 context.verify_mode = cert_reqs
209 if ca_certs is not None:
210 context.load_verify_locations(ca_certs)
211 if certfile is not None or keyfile is not None:
212 context.load_cert_chain(certfile, keyfile)
213 if ciphers is not None:
214 context.set_ciphers(ciphers)
215 return context.wrap_socket(sock, **kwargs)
216
Christian Heimesa170fa12017-09-15 20:27:30 +0200217
218def testing_context(server_cert=SIGNED_CERTFILE):
219 """Create context
220
221 client_context, server_context, hostname = testing_context()
222 """
223 if server_cert == SIGNED_CERTFILE:
224 hostname = SIGNED_CERTFILE_HOSTNAME
225 elif server_cert == SIGNED_CERTFILE2:
226 hostname = SIGNED_CERTFILE2_HOSTNAME
227 else:
228 raise ValueError(server_cert)
229
230 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
231 client_context.load_verify_locations(SIGNING_CA)
232
233 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
234 server_context.load_cert_chain(server_cert)
235
236 return client_context, server_context, hostname
237
238
Antoine Pitrou152efa22010-05-16 18:19:27 +0000239class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000240
Antoine Pitrou480a1242010-04-28 21:37:09 +0000241 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000242 ssl.CERT_NONE
243 ssl.CERT_OPTIONAL
244 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100245 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100246 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100247 if ssl.HAS_ECDH:
248 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100249 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
250 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000251 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100252 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700253 ssl.OP_NO_SSLv2
254 ssl.OP_NO_SSLv3
255 ssl.OP_NO_TLSv1
256 ssl.OP_NO_TLSv1_3
257 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
258 ssl.OP_NO_TLSv1_1
259 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200260 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000261
Antoine Pitrou172f0252014-04-18 20:33:08 +0200262 def test_str_for_enums(self):
263 # Make sure that the PROTOCOL_* constants have enum-like string
264 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200265 proto = ssl.PROTOCOL_TLS
266 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200267 ctx = ssl.SSLContext(proto)
268 self.assertIs(ctx.protocol, proto)
269
Antoine Pitrou480a1242010-04-28 21:37:09 +0000270 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000271 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000273 sys.stdout.write("\n RAND_status is %d (%s)\n"
274 % (v, (v and "sufficient randomness") or
275 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200276
277 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
278 self.assertEqual(len(data), 16)
279 self.assertEqual(is_cryptographic, v == 1)
280 if v:
281 data = ssl.RAND_bytes(16)
282 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200283 else:
284 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200285
Victor Stinner1e81a392013-12-19 16:47:04 +0100286 # negative num is invalid
287 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
288 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
289
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100290 if hasattr(ssl, 'RAND_egd'):
291 self.assertRaises(TypeError, ssl.RAND_egd, 1)
292 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000293 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200294 ssl.RAND_add(b"this is a random bytes object", 75.0)
295 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000296
Christian Heimesf77b4b22013-08-21 13:26:05 +0200297 @unittest.skipUnless(os.name == 'posix', 'requires posix')
298 def test_random_fork(self):
299 status = ssl.RAND_status()
300 if not status:
301 self.fail("OpenSSL's PRNG has insufficient randomness")
302
303 rfd, wfd = os.pipe()
304 pid = os.fork()
305 if pid == 0:
306 try:
307 os.close(rfd)
308 child_random = ssl.RAND_pseudo_bytes(16)[0]
309 self.assertEqual(len(child_random), 16)
310 os.write(wfd, child_random)
311 os.close(wfd)
312 except BaseException:
313 os._exit(1)
314 else:
315 os._exit(0)
316 else:
317 os.close(wfd)
318 self.addCleanup(os.close, rfd)
319 _, status = os.waitpid(pid, 0)
320 self.assertEqual(status, 0)
321
322 child_random = os.read(rfd, 16)
323 self.assertEqual(len(child_random), 16)
324 parent_random = ssl.RAND_pseudo_bytes(16)[0]
325 self.assertEqual(len(parent_random), 16)
326
327 self.assertNotEqual(child_random, parent_random)
328
Antoine Pitrou480a1242010-04-28 21:37:09 +0000329 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000330 # note that this uses an 'unofficial' function in _ssl.c,
331 # provided solely for this test, to exercise the certificate
332 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100333 self.assertEqual(
334 ssl._ssl._test_decode_cert(CERTFILE),
335 CERTFILE_INFO
336 )
337 self.assertEqual(
338 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
339 SIGNED_CERTFILE_INFO
340 )
341
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200342 # Issue #13034: the subjectAltName in some certificates
343 # (notably projects.developer.nokia.com:443) wasn't parsed
344 p = ssl._ssl._test_decode_cert(NOKIACERT)
345 if support.verbose:
346 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
347 self.assertEqual(p['subjectAltName'],
348 (('DNS', 'projects.developer.nokia.com'),
349 ('DNS', 'projects.forum.nokia.com'))
350 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100351 # extra OCSP and AIA fields
352 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
353 self.assertEqual(p['caIssuers'],
354 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
355 self.assertEqual(p['crlDistributionPoints'],
356 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000357
Christian Heimes824f7f32013-08-17 00:54:47 +0200358 def test_parse_cert_CVE_2013_4238(self):
359 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
360 if support.verbose:
361 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
362 subject = ((('countryName', 'US'),),
363 (('stateOrProvinceName', 'Oregon'),),
364 (('localityName', 'Beaverton'),),
365 (('organizationName', 'Python Software Foundation'),),
366 (('organizationalUnitName', 'Python Core Development'),),
367 (('commonName', 'null.python.org\x00example.org'),),
368 (('emailAddress', 'python-dev@python.org'),))
369 self.assertEqual(p['subject'], subject)
370 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200371 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
372 san = (('DNS', 'altnull.python.org\x00example.com'),
373 ('email', 'null@python.org\x00user@example.org'),
374 ('URI', 'http://null.python.org\x00http://example.org'),
375 ('IP Address', '192.0.2.1'),
376 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
377 else:
378 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
379 san = (('DNS', 'altnull.python.org\x00example.com'),
380 ('email', 'null@python.org\x00user@example.org'),
381 ('URI', 'http://null.python.org\x00http://example.org'),
382 ('IP Address', '192.0.2.1'),
383 ('IP Address', '<invalid>'))
384
385 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200386
Christian Heimes1c03abd2016-09-06 23:25:35 +0200387 def test_parse_all_sans(self):
388 p = ssl._ssl._test_decode_cert(ALLSANFILE)
389 self.assertEqual(p['subjectAltName'],
390 (
391 ('DNS', 'allsans'),
392 ('othername', '<unsupported>'),
393 ('othername', '<unsupported>'),
394 ('email', 'user@example.org'),
395 ('DNS', 'www.example.org'),
396 ('DirName',
397 ((('countryName', 'XY'),),
398 (('localityName', 'Castle Anthrax'),),
399 (('organizationName', 'Python Software Foundation'),),
400 (('commonName', 'dirname example'),))),
401 ('URI', 'https://www.python.org/'),
402 ('IP Address', '127.0.0.1'),
403 ('IP Address', '0:0:0:0:0:0:0:1\n'),
404 ('Registered ID', '1.2.3.4.5')
405 )
406 )
407
Antoine Pitrou480a1242010-04-28 21:37:09 +0000408 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000409 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000410 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000411 d1 = ssl.PEM_cert_to_DER_cert(pem)
412 p2 = ssl.DER_cert_to_PEM_cert(d1)
413 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000414 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000415 if not p2.startswith(ssl.PEM_HEADER + '\n'):
416 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
417 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
418 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000419
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000420 def test_openssl_version(self):
421 n = ssl.OPENSSL_VERSION_NUMBER
422 t = ssl.OPENSSL_VERSION_INFO
423 s = ssl.OPENSSL_VERSION
424 self.assertIsInstance(n, int)
425 self.assertIsInstance(t, tuple)
426 self.assertIsInstance(s, str)
427 # Some sanity checks follow
428 # >= 0.9
429 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400430 # < 3.0
431 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000432 major, minor, fix, patch, status = t
433 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400434 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000435 self.assertGreaterEqual(minor, 0)
436 self.assertLess(minor, 256)
437 self.assertGreaterEqual(fix, 0)
438 self.assertLess(fix, 256)
439 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100440 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000441 self.assertGreaterEqual(status, 0)
442 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400443 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200444 if IS_LIBRESSL:
445 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100446 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400447 else:
448 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100449 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450
Antoine Pitrou9d543662010-04-23 23:10:32 +0000451 @support.cpython_only
452 def test_refcycle(self):
453 # Issue #7943: an SSL object doesn't create reference cycles with
454 # itself.
455 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200456 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000457 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100458 with support.check_warnings(("", ResourceWarning)):
459 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100460 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000461
Antoine Pitroua468adc2010-09-14 14:43:44 +0000462 def test_wrapped_unconnected(self):
463 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200464 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000465 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200466 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100467 self.assertRaises(OSError, ss.recv, 1)
468 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
469 self.assertRaises(OSError, ss.recvfrom, 1)
470 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
471 self.assertRaises(OSError, ss.send, b'x')
472 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800473 self.assertRaises(NotImplementedError, ss.sendmsg,
474 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000475
Antoine Pitrou40f08742010-04-24 22:04:40 +0000476 def test_timeout(self):
477 # Issue #8524: when creating an SSL socket, the timeout of the
478 # original socket should be retained.
479 for timeout in (None, 0.0, 5.0):
480 s = socket.socket(socket.AF_INET)
481 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200482 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100483 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000484
Christian Heimesd0486372016-09-10 23:23:33 +0200485 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000486 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000487 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000488 "certfile must be specified",
489 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000490 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000491 "certfile must be specified for server-side operations",
492 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000493 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000494 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200495 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100496 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
497 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200498 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200499 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000500 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000501 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000502 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200503 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000504 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000505 ssl.wrap_socket(sock,
506 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000507 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200508 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000509 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000510 ssl.wrap_socket(sock,
511 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000512 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000513
Martin Panter3464ea22016-02-01 21:58:11 +0000514 def bad_cert_test(self, certfile):
515 """Check that trying to use the given client certificate fails"""
516 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
517 certfile)
518 sock = socket.socket()
519 self.addCleanup(sock.close)
520 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200521 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200522 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000523
524 def test_empty_cert(self):
525 """Wrapping with an empty cert file"""
526 self.bad_cert_test("nullcert.pem")
527
528 def test_malformed_cert(self):
529 """Wrapping with a badly formatted certificate (syntax error)"""
530 self.bad_cert_test("badcert.pem")
531
532 def test_malformed_key(self):
533 """Wrapping with a badly formatted key (syntax error)"""
534 self.bad_cert_test("badkey.pem")
535
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000536 def test_match_hostname(self):
537 def ok(cert, hostname):
538 ssl.match_hostname(cert, hostname)
539 def fail(cert, hostname):
540 self.assertRaises(ssl.CertificateError,
541 ssl.match_hostname, cert, hostname)
542
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100543 # -- Hostname matching --
544
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 cert = {'subject': ((('commonName', 'example.com'),),)}
546 ok(cert, 'example.com')
547 ok(cert, 'ExAmple.cOm')
548 fail(cert, 'www.example.com')
549 fail(cert, '.example.com')
550 fail(cert, 'example.org')
551 fail(cert, 'exampleXcom')
552
553 cert = {'subject': ((('commonName', '*.a.com'),),)}
554 ok(cert, 'foo.a.com')
555 fail(cert, 'bar.foo.a.com')
556 fail(cert, 'a.com')
557 fail(cert, 'Xa.com')
558 fail(cert, '.a.com')
559
Mandeep Singhede2ac92017-11-27 04:01:27 +0530560 # only match wildcards when they are the only thing
561 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000562 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530563 fail(cert, 'foo.com')
564 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000565 fail(cert, 'bar.com')
566 fail(cert, 'foo.a.com')
567 fail(cert, 'bar.foo.com')
568
Christian Heimes824f7f32013-08-17 00:54:47 +0200569 # NULL bytes are bad, CVE-2013-4073
570 cert = {'subject': ((('commonName',
571 'null.python.org\x00example.org'),),)}
572 ok(cert, 'null.python.org\x00example.org') # or raise an error?
573 fail(cert, 'example.org')
574 fail(cert, 'null.python.org')
575
Georg Brandl72c98d32013-10-27 07:16:53 +0100576 # error cases with wildcards
577 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
578 fail(cert, 'bar.foo.a.com')
579 fail(cert, 'a.com')
580 fail(cert, 'Xa.com')
581 fail(cert, '.a.com')
582
583 cert = {'subject': ((('commonName', 'a.*.com'),),)}
584 fail(cert, 'a.foo.com')
585 fail(cert, 'a..com')
586 fail(cert, 'a.com')
587
588 # wildcard doesn't match IDNA prefix 'xn--'
589 idna = 'püthon.python.org'.encode("idna").decode("ascii")
590 cert = {'subject': ((('commonName', idna),),)}
591 ok(cert, idna)
592 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
593 fail(cert, idna)
594 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
595 fail(cert, idna)
596
597 # wildcard in first fragment and IDNA A-labels in sequent fragments
598 # are supported.
599 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
600 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530601 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
602 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100603 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
604 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
605
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000606 # Slightly fake real-world example
607 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
608 'subject': ((('commonName', 'linuxfrz.org'),),),
609 'subjectAltName': (('DNS', 'linuxfr.org'),
610 ('DNS', 'linuxfr.com'),
611 ('othername', '<unsupported>'))}
612 ok(cert, 'linuxfr.org')
613 ok(cert, 'linuxfr.com')
614 # Not a "DNS" entry
615 fail(cert, '<unsupported>')
616 # When there is a subjectAltName, commonName isn't used
617 fail(cert, 'linuxfrz.org')
618
619 # A pristine real-world example
620 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
621 'subject': ((('countryName', 'US'),),
622 (('stateOrProvinceName', 'California'),),
623 (('localityName', 'Mountain View'),),
624 (('organizationName', 'Google Inc'),),
625 (('commonName', 'mail.google.com'),))}
626 ok(cert, 'mail.google.com')
627 fail(cert, 'gmail.com')
628 # Only commonName is considered
629 fail(cert, 'California')
630
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100631 # -- IPv4 matching --
632 cert = {'subject': ((('commonName', 'example.com'),),),
633 'subjectAltName': (('DNS', 'example.com'),
634 ('IP Address', '10.11.12.13'),
635 ('IP Address', '14.15.16.17'))}
636 ok(cert, '10.11.12.13')
637 ok(cert, '14.15.16.17')
638 fail(cert, '14.15.16.18')
639 fail(cert, 'example.net')
640
641 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800642 if hasattr(socket, 'AF_INET6'):
643 cert = {'subject': ((('commonName', 'example.com'),),),
644 'subjectAltName': (
645 ('DNS', 'example.com'),
646 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
647 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
648 ok(cert, '2001::cafe')
649 ok(cert, '2003::baba')
650 fail(cert, '2003::bebe')
651 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100652
653 # -- Miscellaneous --
654
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000655 # Neither commonName nor subjectAltName
656 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
657 'subject': ((('countryName', 'US'),),
658 (('stateOrProvinceName', 'California'),),
659 (('localityName', 'Mountain View'),),
660 (('organizationName', 'Google Inc'),))}
661 fail(cert, 'mail.google.com')
662
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200663 # No DNS entry in subjectAltName but a commonName
664 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
665 'subject': ((('countryName', 'US'),),
666 (('stateOrProvinceName', 'California'),),
667 (('localityName', 'Mountain View'),),
668 (('commonName', 'mail.google.com'),)),
669 'subjectAltName': (('othername', 'blabla'), )}
670 ok(cert, 'mail.google.com')
671
672 # No DNS entry subjectAltName and no commonName
673 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
674 'subject': ((('countryName', 'US'),),
675 (('stateOrProvinceName', 'California'),),
676 (('localityName', 'Mountain View'),),
677 (('organizationName', 'Google Inc'),)),
678 'subjectAltName': (('othername', 'blabla'),)}
679 fail(cert, 'google.com')
680
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000681 # Empty cert / no cert
682 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
683 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
684
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200685 # Issue #17980: avoid denials of service by refusing more than one
686 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800687 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
688 with self.assertRaisesRegex(
689 ssl.CertificateError,
690 "partial wildcards in leftmost label are not supported"):
691 ssl.match_hostname(cert, 'axxb.example.com')
692
693 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
694 with self.assertRaisesRegex(
695 ssl.CertificateError,
696 "wildcard can only be present in the leftmost label"):
697 ssl.match_hostname(cert, 'www.sub.example.com')
698
699 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
700 with self.assertRaisesRegex(
701 ssl.CertificateError,
702 "too many wildcards"):
703 ssl.match_hostname(cert, 'axxbxxc.example.com')
704
705 cert = {'subject': ((('commonName', '*'),),)}
706 with self.assertRaisesRegex(
707 ssl.CertificateError,
708 "sole wildcard without additional labels are not support"):
709 ssl.match_hostname(cert, 'host')
710
711 cert = {'subject': ((('commonName', '*.com'),),)}
712 with self.assertRaisesRegex(
713 ssl.CertificateError,
714 r"hostname 'com' doesn't match '\*.com'"):
715 ssl.match_hostname(cert, 'com')
716
717 # extra checks for _inet_paton()
718 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
719 with self.assertRaises(ValueError):
720 ssl._inet_paton(invalid)
721 for ipaddr in ['127.0.0.1', '192.168.0.1']:
722 self.assertTrue(ssl._inet_paton(ipaddr))
723 if hasattr(socket, 'AF_INET6'):
724 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
725 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200726
Antoine Pitroud5323212010-10-22 18:19:07 +0000727 def test_server_side(self):
728 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200729 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000730 with socket.socket() as sock:
731 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
732 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000733
Antoine Pitroud6494802011-07-21 01:11:30 +0200734 def test_unknown_channel_binding(self):
735 # should raise ValueError for unknown type
736 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200737 s.bind(('127.0.0.1', 0))
738 s.listen()
739 c = socket.socket(socket.AF_INET)
740 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200741 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100742 with self.assertRaises(ValueError):
743 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200744 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200745
746 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
747 "'tls-unique' channel binding not available")
748 def test_tls_unique_channel_binding(self):
749 # unconnected should return None for known type
750 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200751 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100752 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200753 # the same for server-side
754 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200755 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100756 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200757
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600758 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200759 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600760 r = repr(ss)
761 with self.assertWarns(ResourceWarning) as cm:
762 ss = None
763 support.gc_collect()
764 self.assertIn(r, str(cm.warning.args[0]))
765
Christian Heimes6d7ad132013-06-09 18:02:55 +0200766 def test_get_default_verify_paths(self):
767 paths = ssl.get_default_verify_paths()
768 self.assertEqual(len(paths), 6)
769 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
770
771 with support.EnvironmentVarGuard() as env:
772 env["SSL_CERT_DIR"] = CAPATH
773 env["SSL_CERT_FILE"] = CERTFILE
774 paths = ssl.get_default_verify_paths()
775 self.assertEqual(paths.cafile, CERTFILE)
776 self.assertEqual(paths.capath, CAPATH)
777
Christian Heimes44109d72013-11-22 01:51:30 +0100778 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
779 def test_enum_certificates(self):
780 self.assertTrue(ssl.enum_certificates("CA"))
781 self.assertTrue(ssl.enum_certificates("ROOT"))
782
783 self.assertRaises(TypeError, ssl.enum_certificates)
784 self.assertRaises(WindowsError, ssl.enum_certificates, "")
785
Christian Heimesc2d65e12013-11-22 16:13:55 +0100786 trust_oids = set()
787 for storename in ("CA", "ROOT"):
788 store = ssl.enum_certificates(storename)
789 self.assertIsInstance(store, list)
790 for element in store:
791 self.assertIsInstance(element, tuple)
792 self.assertEqual(len(element), 3)
793 cert, enc, trust = element
794 self.assertIsInstance(cert, bytes)
795 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
796 self.assertIsInstance(trust, (set, bool))
797 if isinstance(trust, set):
798 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100799
800 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100801 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200802
Christian Heimes46bebee2013-06-09 19:03:31 +0200803 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100804 def test_enum_crls(self):
805 self.assertTrue(ssl.enum_crls("CA"))
806 self.assertRaises(TypeError, ssl.enum_crls)
807 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200808
Christian Heimes44109d72013-11-22 01:51:30 +0100809 crls = ssl.enum_crls("CA")
810 self.assertIsInstance(crls, list)
811 for element in crls:
812 self.assertIsInstance(element, tuple)
813 self.assertEqual(len(element), 2)
814 self.assertIsInstance(element[0], bytes)
815 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200816
Christian Heimes46bebee2013-06-09 19:03:31 +0200817
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100818 def test_asn1object(self):
819 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
820 '1.3.6.1.5.5.7.3.1')
821
822 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
823 self.assertEqual(val, expected)
824 self.assertEqual(val.nid, 129)
825 self.assertEqual(val.shortname, 'serverAuth')
826 self.assertEqual(val.longname, 'TLS Web Server Authentication')
827 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
828 self.assertIsInstance(val, ssl._ASN1Object)
829 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
830
831 val = ssl._ASN1Object.fromnid(129)
832 self.assertEqual(val, expected)
833 self.assertIsInstance(val, ssl._ASN1Object)
834 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100835 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
836 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100837 for i in range(1000):
838 try:
839 obj = ssl._ASN1Object.fromnid(i)
840 except ValueError:
841 pass
842 else:
843 self.assertIsInstance(obj.nid, int)
844 self.assertIsInstance(obj.shortname, str)
845 self.assertIsInstance(obj.longname, str)
846 self.assertIsInstance(obj.oid, (str, type(None)))
847
848 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
849 self.assertEqual(val, expected)
850 self.assertIsInstance(val, ssl._ASN1Object)
851 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
852 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
853 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100854 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
855 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100856
Christian Heimes72d28502013-11-23 13:56:58 +0100857 def test_purpose_enum(self):
858 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
859 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
860 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
861 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
862 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
863 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
864 '1.3.6.1.5.5.7.3.1')
865
866 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
867 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
868 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
869 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
870 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
871 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
872 '1.3.6.1.5.5.7.3.2')
873
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100874 def test_unsupported_dtls(self):
875 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
876 self.addCleanup(s.close)
877 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200878 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100879 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200880 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100881 with self.assertRaises(NotImplementedError) as cx:
882 ctx.wrap_socket(s)
883 self.assertEqual(str(cx.exception), "only stream sockets are supported")
884
Antoine Pitrouc695c952014-04-28 20:57:36 +0200885 def cert_time_ok(self, timestring, timestamp):
886 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
887
888 def cert_time_fail(self, timestring):
889 with self.assertRaises(ValueError):
890 ssl.cert_time_to_seconds(timestring)
891
892 @unittest.skipUnless(utc_offset(),
893 'local time needs to be different from UTC')
894 def test_cert_time_to_seconds_timezone(self):
895 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
896 # results if local timezone is not UTC
897 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
898 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
899
900 def test_cert_time_to_seconds(self):
901 timestring = "Jan 5 09:34:43 2018 GMT"
902 ts = 1515144883.0
903 self.cert_time_ok(timestring, ts)
904 # accept keyword parameter, assert its name
905 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
906 # accept both %e and %d (space or zero generated by strftime)
907 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
908 # case-insensitive
909 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
910 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
911 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
912 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
913 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
914 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
915 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
916 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
917
918 newyear_ts = 1230768000.0
919 # leap seconds
920 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
921 # same timestamp
922 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
923
924 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
925 # allow 60th second (even if it is not a leap second)
926 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
927 # allow 2nd leap second for compatibility with time.strptime()
928 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
929 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
930
Mike53f7a7c2017-12-14 14:04:53 +0300931 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200932 # 99991231235959Z (rfc 5280)
933 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
934
935 @support.run_with_locale('LC_ALL', '')
936 def test_cert_time_to_seconds_locale(self):
937 # `cert_time_to_seconds()` should be locale independent
938
939 def local_february_name():
940 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
941
942 if local_february_name().lower() == 'feb':
943 self.skipTest("locale-specific month name needs to be "
944 "different from C locale")
945
946 # locale-independent
947 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
948 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
949
Martin Panter3840b2a2016-03-27 01:53:46 +0000950 def test_connect_ex_error(self):
951 server = socket.socket(socket.AF_INET)
952 self.addCleanup(server.close)
953 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200954 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000955 cert_reqs=ssl.CERT_REQUIRED)
956 self.addCleanup(s.close)
957 rc = s.connect_ex((HOST, port))
958 # Issue #19919: Windows machines or VMs hosted on Windows
959 # machines sometimes return EWOULDBLOCK.
960 errors = (
961 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
962 errno.EWOULDBLOCK,
963 )
964 self.assertIn(rc, errors)
965
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100966
Antoine Pitrou152efa22010-05-16 18:19:27 +0000967class ContextTests(unittest.TestCase):
968
Antoine Pitrou23df4832010-08-04 17:14:06 +0000969 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000970 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100971 for protocol in PROTOCOLS:
972 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200973 ctx = ssl.SSLContext()
974 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000975 self.assertRaises(ValueError, ssl.SSLContext, -1)
976 self.assertRaises(ValueError, ssl.SSLContext, 42)
977
Antoine Pitrou23df4832010-08-04 17:14:06 +0000978 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000979 def test_protocol(self):
980 for proto in PROTOCOLS:
981 ctx = ssl.SSLContext(proto)
982 self.assertEqual(ctx.protocol, proto)
983
984 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200985 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000986 ctx.set_ciphers("ALL")
987 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000988 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000989 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000990
Christian Heimes892d66e2018-01-29 14:10:18 +0100991 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
992 "Test applies only to Python default ciphers")
993 def test_python_ciphers(self):
994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
995 ciphers = ctx.get_ciphers()
996 for suite in ciphers:
997 name = suite['name']
998 self.assertNotIn("PSK", name)
999 self.assertNotIn("SRP", name)
1000 self.assertNotIn("MD5", name)
1001 self.assertNotIn("RC4", name)
1002 self.assertNotIn("3DES", name)
1003
Christian Heimes25bfcd52016-09-06 00:04:45 +02001004 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1005 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001006 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001007 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001008 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001009 self.assertIn('AES256-GCM-SHA384', names)
1010 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001011
Antoine Pitrou23df4832010-08-04 17:14:06 +00001012 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001013 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001014 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001015 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001016 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001017 # SSLContext also enables these by default
1018 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
1019 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
Christian Heimes598894f2016-09-05 23:19:05 +02001020 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001021 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001022 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001023 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001024 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1025 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001026 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001027 # Ubuntu has OP_NO_SSLv3 forced on by default
1028 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001029 else:
1030 with self.assertRaises(ValueError):
1031 ctx.options = 0
1032
Christian Heimesa170fa12017-09-15 20:27:30 +02001033 def test_verify_mode_protocol(self):
1034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001035 # Default value
1036 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1037 ctx.verify_mode = ssl.CERT_OPTIONAL
1038 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1039 ctx.verify_mode = ssl.CERT_REQUIRED
1040 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1041 ctx.verify_mode = ssl.CERT_NONE
1042 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1043 with self.assertRaises(TypeError):
1044 ctx.verify_mode = None
1045 with self.assertRaises(ValueError):
1046 ctx.verify_mode = 42
1047
Christian Heimesa170fa12017-09-15 20:27:30 +02001048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1049 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1050 self.assertFalse(ctx.check_hostname)
1051
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1053 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1054 self.assertTrue(ctx.check_hostname)
1055
Christian Heimes61d478c2018-01-27 15:51:38 +01001056 def test_hostname_checks_common_name(self):
1057 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1058 self.assertTrue(ctx.hostname_checks_common_name)
1059 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1060 ctx.hostname_checks_common_name = True
1061 self.assertTrue(ctx.hostname_checks_common_name)
1062 ctx.hostname_checks_common_name = False
1063 self.assertFalse(ctx.hostname_checks_common_name)
1064 ctx.hostname_checks_common_name = True
1065 self.assertTrue(ctx.hostname_checks_common_name)
1066 else:
1067 with self.assertRaises(AttributeError):
1068 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001069
Christian Heimes2427b502013-11-23 11:24:32 +01001070 @unittest.skipUnless(have_verify_flags(),
1071 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001072 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001073 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001074 # default value
1075 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1076 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001077 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1078 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1079 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1080 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1081 ctx.verify_flags = ssl.VERIFY_DEFAULT
1082 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1083 # supports any value
1084 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1085 self.assertEqual(ctx.verify_flags,
1086 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1087 with self.assertRaises(TypeError):
1088 ctx.verify_flags = None
1089
Antoine Pitrou152efa22010-05-16 18:19:27 +00001090 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001091 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001092 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001093 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001094 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1095 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001096 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001097 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001098 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001099 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001100 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001101 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001102 ctx.load_cert_chain(EMPTYCERT)
1103 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001104 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001105 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1106 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1107 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001108 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001109 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001110 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001111 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001112 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001113 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1114 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001115 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001116 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001117 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001118 # Password protected key and cert
1119 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1120 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1121 ctx.load_cert_chain(CERTFILE_PROTECTED,
1122 password=bytearray(KEY_PASSWORD.encode()))
1123 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1124 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1125 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1126 bytearray(KEY_PASSWORD.encode()))
1127 with self.assertRaisesRegex(TypeError, "should be a string"):
1128 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1129 with self.assertRaises(ssl.SSLError):
1130 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1131 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1132 # openssl has a fixed limit on the password buffer.
1133 # PEM_BUFSIZE is generally set to 1kb.
1134 # Return a string larger than this.
1135 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1136 # Password callback
1137 def getpass_unicode():
1138 return KEY_PASSWORD
1139 def getpass_bytes():
1140 return KEY_PASSWORD.encode()
1141 def getpass_bytearray():
1142 return bytearray(KEY_PASSWORD.encode())
1143 def getpass_badpass():
1144 return "badpass"
1145 def getpass_huge():
1146 return b'a' * (1024 * 1024)
1147 def getpass_bad_type():
1148 return 9
1149 def getpass_exception():
1150 raise Exception('getpass error')
1151 class GetPassCallable:
1152 def __call__(self):
1153 return KEY_PASSWORD
1154 def getpass(self):
1155 return KEY_PASSWORD
1156 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1157 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1158 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1159 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1160 ctx.load_cert_chain(CERTFILE_PROTECTED,
1161 password=GetPassCallable().getpass)
1162 with self.assertRaises(ssl.SSLError):
1163 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1164 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1165 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1166 with self.assertRaisesRegex(TypeError, "must return a string"):
1167 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1168 with self.assertRaisesRegex(Exception, "getpass error"):
1169 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1170 # Make sure the password function isn't called if it isn't needed
1171 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001172
1173 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001174 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001175 ctx.load_verify_locations(CERTFILE)
1176 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1177 ctx.load_verify_locations(BYTES_CERTFILE)
1178 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1179 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001180 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001181 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001182 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001183 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001184 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001185 ctx.load_verify_locations(BADCERT)
1186 ctx.load_verify_locations(CERTFILE, CAPATH)
1187 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1188
Victor Stinner80f75e62011-01-29 11:31:20 +00001189 # Issue #10989: crash if the second argument type is invalid
1190 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1191
Christian Heimesefff7062013-11-21 03:35:02 +01001192 def test_load_verify_cadata(self):
1193 # test cadata
1194 with open(CAFILE_CACERT) as f:
1195 cacert_pem = f.read()
1196 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1197 with open(CAFILE_NEURONIO) as f:
1198 neuronio_pem = f.read()
1199 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1200
1201 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001202 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001203 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1204 ctx.load_verify_locations(cadata=cacert_pem)
1205 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1206 ctx.load_verify_locations(cadata=neuronio_pem)
1207 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1208 # cert already in hash table
1209 ctx.load_verify_locations(cadata=neuronio_pem)
1210 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1211
1212 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001213 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001214 combined = "\n".join((cacert_pem, neuronio_pem))
1215 ctx.load_verify_locations(cadata=combined)
1216 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1217
1218 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001219 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001220 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1221 neuronio_pem, "tail"]
1222 ctx.load_verify_locations(cadata="\n".join(combined))
1223 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1224
1225 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001226 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001227 ctx.load_verify_locations(cadata=cacert_der)
1228 ctx.load_verify_locations(cadata=neuronio_der)
1229 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1230 # cert already in hash table
1231 ctx.load_verify_locations(cadata=cacert_der)
1232 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1233
1234 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001235 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001236 combined = b"".join((cacert_der, neuronio_der))
1237 ctx.load_verify_locations(cadata=combined)
1238 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1239
1240 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001241 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001242 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1243
1244 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1245 ctx.load_verify_locations(cadata="broken")
1246 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1247 ctx.load_verify_locations(cadata=b"broken")
1248
1249
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001250 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001251 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001252 ctx.load_dh_params(DHFILE)
1253 if os.name != 'nt':
1254 ctx.load_dh_params(BYTES_DHFILE)
1255 self.assertRaises(TypeError, ctx.load_dh_params)
1256 self.assertRaises(TypeError, ctx.load_dh_params, None)
1257 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001258 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001259 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001260 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001261 ctx.load_dh_params(CERTFILE)
1262
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001263 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001264 def test_session_stats(self):
1265 for proto in PROTOCOLS:
1266 ctx = ssl.SSLContext(proto)
1267 self.assertEqual(ctx.session_stats(), {
1268 'number': 0,
1269 'connect': 0,
1270 'connect_good': 0,
1271 'connect_renegotiate': 0,
1272 'accept': 0,
1273 'accept_good': 0,
1274 'accept_renegotiate': 0,
1275 'hits': 0,
1276 'misses': 0,
1277 'timeouts': 0,
1278 'cache_full': 0,
1279 })
1280
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001281 def test_set_default_verify_paths(self):
1282 # There's not much we can do to test that it acts as expected,
1283 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001284 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001285 ctx.set_default_verify_paths()
1286
Antoine Pitrou501da612011-12-21 09:27:41 +01001287 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001288 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001289 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001290 ctx.set_ecdh_curve("prime256v1")
1291 ctx.set_ecdh_curve(b"prime256v1")
1292 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1293 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1294 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1295 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1296
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001297 @needs_sni
1298 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001299 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001300
1301 # set_servername_callback expects a callable, or None
1302 self.assertRaises(TypeError, ctx.set_servername_callback)
1303 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1304 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1305 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1306
1307 def dummycallback(sock, servername, ctx):
1308 pass
1309 ctx.set_servername_callback(None)
1310 ctx.set_servername_callback(dummycallback)
1311
1312 @needs_sni
1313 def test_sni_callback_refcycle(self):
1314 # Reference cycles through the servername callback are detected
1315 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001316 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001317 def dummycallback(sock, servername, ctx, cycle=ctx):
1318 pass
1319 ctx.set_servername_callback(dummycallback)
1320 wr = weakref.ref(ctx)
1321 del ctx, dummycallback
1322 gc.collect()
1323 self.assertIs(wr(), None)
1324
Christian Heimes9a5395a2013-06-17 15:44:12 +02001325 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001326 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001327 self.assertEqual(ctx.cert_store_stats(),
1328 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1329 ctx.load_cert_chain(CERTFILE)
1330 self.assertEqual(ctx.cert_store_stats(),
1331 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1332 ctx.load_verify_locations(CERTFILE)
1333 self.assertEqual(ctx.cert_store_stats(),
1334 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001335 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001336 self.assertEqual(ctx.cert_store_stats(),
1337 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1338
1339 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001340 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001341 self.assertEqual(ctx.get_ca_certs(), [])
1342 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1343 ctx.load_verify_locations(CERTFILE)
1344 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001345 # but CAFILE_CACERT is a CA cert
1346 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001347 self.assertEqual(ctx.get_ca_certs(),
1348 [{'issuer': ((('organizationName', 'Root CA'),),
1349 (('organizationalUnitName', 'http://www.cacert.org'),),
1350 (('commonName', 'CA Cert Signing Authority'),),
1351 (('emailAddress', 'support@cacert.org'),)),
1352 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1353 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1354 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001355 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001356 'subject': ((('organizationName', 'Root CA'),),
1357 (('organizationalUnitName', 'http://www.cacert.org'),),
1358 (('commonName', 'CA Cert Signing Authority'),),
1359 (('emailAddress', 'support@cacert.org'),)),
1360 'version': 3}])
1361
Martin Panterb55f8b72016-01-14 12:53:56 +00001362 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001363 pem = f.read()
1364 der = ssl.PEM_cert_to_DER_cert(pem)
1365 self.assertEqual(ctx.get_ca_certs(True), [der])
1366
Christian Heimes72d28502013-11-23 13:56:58 +01001367 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001368 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001369 ctx.load_default_certs()
1370
Christian Heimesa170fa12017-09-15 20:27:30 +02001371 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001372 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1373 ctx.load_default_certs()
1374
Christian Heimesa170fa12017-09-15 20:27:30 +02001375 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001376 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1377
Christian Heimesa170fa12017-09-15 20:27:30 +02001378 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001379 self.assertRaises(TypeError, ctx.load_default_certs, None)
1380 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1381
Benjamin Peterson91244e02014-10-03 18:17:15 -04001382 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001383 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001384 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001385 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001386 with support.EnvironmentVarGuard() as env:
1387 env["SSL_CERT_DIR"] = CAPATH
1388 env["SSL_CERT_FILE"] = CERTFILE
1389 ctx.load_default_certs()
1390 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1391
Benjamin Peterson91244e02014-10-03 18:17:15 -04001392 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001393 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001394 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001395 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001396 ctx.load_default_certs()
1397 stats = ctx.cert_store_stats()
1398
Christian Heimesa170fa12017-09-15 20:27:30 +02001399 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001400 with support.EnvironmentVarGuard() as env:
1401 env["SSL_CERT_DIR"] = CAPATH
1402 env["SSL_CERT_FILE"] = CERTFILE
1403 ctx.load_default_certs()
1404 stats["x509"] += 1
1405 self.assertEqual(ctx.cert_store_stats(), stats)
1406
Christian Heimes358cfd42016-09-10 22:43:48 +02001407 def _assert_context_options(self, ctx):
1408 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1409 if OP_NO_COMPRESSION != 0:
1410 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1411 OP_NO_COMPRESSION)
1412 if OP_SINGLE_DH_USE != 0:
1413 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1414 OP_SINGLE_DH_USE)
1415 if OP_SINGLE_ECDH_USE != 0:
1416 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1417 OP_SINGLE_ECDH_USE)
1418 if OP_CIPHER_SERVER_PREFERENCE != 0:
1419 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1420 OP_CIPHER_SERVER_PREFERENCE)
1421
Christian Heimes4c05b472013-11-23 15:58:30 +01001422 def test_create_default_context(self):
1423 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001424
Christian Heimesa170fa12017-09-15 20:27:30 +02001425 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001426 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001427 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001428 self._assert_context_options(ctx)
1429
Christian Heimes4c05b472013-11-23 15:58:30 +01001430 with open(SIGNING_CA) as f:
1431 cadata = f.read()
1432 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1433 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001434 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001435 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001436 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001437
1438 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001439 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001440 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001441 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001442
Christian Heimes67986f92013-11-23 22:43:47 +01001443 def test__create_stdlib_context(self):
1444 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001445 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001446 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001447 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001448 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001449
1450 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1451 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1452 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001453 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001454
1455 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001456 cert_reqs=ssl.CERT_REQUIRED,
1457 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001458 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1459 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001460 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001461 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001462
1463 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001464 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001465 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001466 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001467
Christian Heimes1aa9a752013-12-02 02:41:19 +01001468 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001469 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001470 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001471 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001472
Christian Heimese82c0342017-09-15 20:29:57 +02001473 # Auto set CERT_REQUIRED
1474 ctx.check_hostname = True
1475 self.assertTrue(ctx.check_hostname)
1476 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1477 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001478 ctx.verify_mode = ssl.CERT_REQUIRED
1479 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001480 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001481
Christian Heimese82c0342017-09-15 20:29:57 +02001482 # Changing verify_mode does not affect check_hostname
1483 ctx.check_hostname = False
1484 ctx.verify_mode = ssl.CERT_NONE
1485 ctx.check_hostname = False
1486 self.assertFalse(ctx.check_hostname)
1487 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1488 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001489 ctx.check_hostname = True
1490 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001491 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1492
1493 ctx.check_hostname = False
1494 ctx.verify_mode = ssl.CERT_OPTIONAL
1495 ctx.check_hostname = False
1496 self.assertFalse(ctx.check_hostname)
1497 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1498 # keep CERT_OPTIONAL
1499 ctx.check_hostname = True
1500 self.assertTrue(ctx.check_hostname)
1501 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001502
1503 # Cannot set CERT_NONE with check_hostname enabled
1504 with self.assertRaises(ValueError):
1505 ctx.verify_mode = ssl.CERT_NONE
1506 ctx.check_hostname = False
1507 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001508 ctx.verify_mode = ssl.CERT_NONE
1509 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001510
Christian Heimes5fe668c2016-09-12 00:01:11 +02001511 def test_context_client_server(self):
1512 # PROTOCOL_TLS_CLIENT has sane defaults
1513 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1514 self.assertTrue(ctx.check_hostname)
1515 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1516
1517 # PROTOCOL_TLS_SERVER has different but also sane defaults
1518 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1519 self.assertFalse(ctx.check_hostname)
1520 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1521
Christian Heimes4df60f12017-09-15 20:26:05 +02001522 def test_context_custom_class(self):
1523 class MySSLSocket(ssl.SSLSocket):
1524 pass
1525
1526 class MySSLObject(ssl.SSLObject):
1527 pass
1528
1529 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1530 ctx.sslsocket_class = MySSLSocket
1531 ctx.sslobject_class = MySSLObject
1532
1533 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1534 self.assertIsInstance(sock, MySSLSocket)
1535 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1536 self.assertIsInstance(obj, MySSLObject)
1537
Antoine Pitrou152efa22010-05-16 18:19:27 +00001538
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001539class SSLErrorTests(unittest.TestCase):
1540
1541 def test_str(self):
1542 # The str() of a SSLError doesn't include the errno
1543 e = ssl.SSLError(1, "foo")
1544 self.assertEqual(str(e), "foo")
1545 self.assertEqual(e.errno, 1)
1546 # Same for a subclass
1547 e = ssl.SSLZeroReturnError(1, "foo")
1548 self.assertEqual(str(e), "foo")
1549 self.assertEqual(e.errno, 1)
1550
1551 def test_lib_reason(self):
1552 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001553 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001554 with self.assertRaises(ssl.SSLError) as cm:
1555 ctx.load_dh_params(CERTFILE)
1556 self.assertEqual(cm.exception.library, 'PEM')
1557 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1558 s = str(cm.exception)
1559 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1560
1561 def test_subclass(self):
1562 # Check that the appropriate SSLError subclass is raised
1563 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001564 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1565 ctx.check_hostname = False
1566 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001567 with socket.socket() as s:
1568 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001569 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001570 c = socket.socket()
1571 c.connect(s.getsockname())
1572 c.setblocking(False)
1573 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001574 with self.assertRaises(ssl.SSLWantReadError) as cm:
1575 c.do_handshake()
1576 s = str(cm.exception)
1577 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1578 # For compatibility
1579 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1580
1581
Christian Heimes61d478c2018-01-27 15:51:38 +01001582 def test_bad_server_hostname(self):
1583 ctx = ssl.create_default_context()
1584 with self.assertRaises(ValueError):
1585 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1586 server_hostname="")
1587 with self.assertRaises(ValueError):
1588 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1589 server_hostname=".example.org")
1590
1591
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001592class MemoryBIOTests(unittest.TestCase):
1593
1594 def test_read_write(self):
1595 bio = ssl.MemoryBIO()
1596 bio.write(b'foo')
1597 self.assertEqual(bio.read(), b'foo')
1598 self.assertEqual(bio.read(), b'')
1599 bio.write(b'foo')
1600 bio.write(b'bar')
1601 self.assertEqual(bio.read(), b'foobar')
1602 self.assertEqual(bio.read(), b'')
1603 bio.write(b'baz')
1604 self.assertEqual(bio.read(2), b'ba')
1605 self.assertEqual(bio.read(1), b'z')
1606 self.assertEqual(bio.read(1), b'')
1607
1608 def test_eof(self):
1609 bio = ssl.MemoryBIO()
1610 self.assertFalse(bio.eof)
1611 self.assertEqual(bio.read(), b'')
1612 self.assertFalse(bio.eof)
1613 bio.write(b'foo')
1614 self.assertFalse(bio.eof)
1615 bio.write_eof()
1616 self.assertFalse(bio.eof)
1617 self.assertEqual(bio.read(2), b'fo')
1618 self.assertFalse(bio.eof)
1619 self.assertEqual(bio.read(1), b'o')
1620 self.assertTrue(bio.eof)
1621 self.assertEqual(bio.read(), b'')
1622 self.assertTrue(bio.eof)
1623
1624 def test_pending(self):
1625 bio = ssl.MemoryBIO()
1626 self.assertEqual(bio.pending, 0)
1627 bio.write(b'foo')
1628 self.assertEqual(bio.pending, 3)
1629 for i in range(3):
1630 bio.read(1)
1631 self.assertEqual(bio.pending, 3-i-1)
1632 for i in range(3):
1633 bio.write(b'x')
1634 self.assertEqual(bio.pending, i+1)
1635 bio.read()
1636 self.assertEqual(bio.pending, 0)
1637
1638 def test_buffer_types(self):
1639 bio = ssl.MemoryBIO()
1640 bio.write(b'foo')
1641 self.assertEqual(bio.read(), b'foo')
1642 bio.write(bytearray(b'bar'))
1643 self.assertEqual(bio.read(), b'bar')
1644 bio.write(memoryview(b'baz'))
1645 self.assertEqual(bio.read(), b'baz')
1646
1647 def test_error_types(self):
1648 bio = ssl.MemoryBIO()
1649 self.assertRaises(TypeError, bio.write, 'foo')
1650 self.assertRaises(TypeError, bio.write, None)
1651 self.assertRaises(TypeError, bio.write, True)
1652 self.assertRaises(TypeError, bio.write, 1)
1653
1654
Martin Panter3840b2a2016-03-27 01:53:46 +00001655class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001656 """Tests that connect to a simple server running in the background"""
1657
1658 def setUp(self):
1659 server = ThreadedEchoServer(SIGNED_CERTFILE)
1660 self.server_addr = (HOST, server.port)
1661 server.__enter__()
1662 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001663
Antoine Pitrou480a1242010-04-28 21:37:09 +00001664 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001665 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001666 cert_reqs=ssl.CERT_NONE) as s:
1667 s.connect(self.server_addr)
1668 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001669 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001670
Martin Panter3840b2a2016-03-27 01:53:46 +00001671 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001672 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001673 cert_reqs=ssl.CERT_REQUIRED,
1674 ca_certs=SIGNING_CA) as s:
1675 s.connect(self.server_addr)
1676 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001677 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001678
Martin Panter3840b2a2016-03-27 01:53:46 +00001679 def test_connect_fail(self):
1680 # This should fail because we have no verification certs. Connection
1681 # failure crashes ThreadedEchoServer, so run this in an independent
1682 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001683 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001684 cert_reqs=ssl.CERT_REQUIRED)
1685 self.addCleanup(s.close)
1686 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1687 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001688
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001689 def test_connect_ex(self):
1690 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001691 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001692 cert_reqs=ssl.CERT_REQUIRED,
1693 ca_certs=SIGNING_CA)
1694 self.addCleanup(s.close)
1695 self.assertEqual(0, s.connect_ex(self.server_addr))
1696 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001697
1698 def test_non_blocking_connect_ex(self):
1699 # Issue #11326: non-blocking connect_ex() should allow handshake
1700 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001701 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001702 cert_reqs=ssl.CERT_REQUIRED,
1703 ca_certs=SIGNING_CA,
1704 do_handshake_on_connect=False)
1705 self.addCleanup(s.close)
1706 s.setblocking(False)
1707 rc = s.connect_ex(self.server_addr)
1708 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1709 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1710 # Wait for connect to finish
1711 select.select([], [s], [], 5.0)
1712 # Non-blocking handshake
1713 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001714 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001715 s.do_handshake()
1716 break
1717 except ssl.SSLWantReadError:
1718 select.select([s], [], [], 5.0)
1719 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001720 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001721 # SSL established
1722 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001723
Antoine Pitrou152efa22010-05-16 18:19:27 +00001724 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001725 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001726 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001727 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1728 s.connect(self.server_addr)
1729 self.assertEqual({}, s.getpeercert())
1730 # Same with a server hostname
1731 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1732 server_hostname="dummy") as s:
1733 s.connect(self.server_addr)
1734 ctx.verify_mode = ssl.CERT_REQUIRED
1735 # This should succeed because we specify the root cert
1736 ctx.load_verify_locations(SIGNING_CA)
1737 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1738 s.connect(self.server_addr)
1739 cert = s.getpeercert()
1740 self.assertTrue(cert)
1741
1742 def test_connect_with_context_fail(self):
1743 # This should fail because we have no verification certs. Connection
1744 # failure crashes ThreadedEchoServer, so run this in an independent
1745 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001746 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001747 ctx.verify_mode = ssl.CERT_REQUIRED
1748 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1749 self.addCleanup(s.close)
1750 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1751 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001752
1753 def test_connect_capath(self):
1754 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001755 # NOTE: the subject hashing algorithm has been changed between
1756 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1757 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001758 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001759 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001760 ctx.verify_mode = ssl.CERT_REQUIRED
1761 ctx.load_verify_locations(capath=CAPATH)
1762 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1763 s.connect(self.server_addr)
1764 cert = s.getpeercert()
1765 self.assertTrue(cert)
1766 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001767 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001768 ctx.verify_mode = ssl.CERT_REQUIRED
1769 ctx.load_verify_locations(capath=BYTES_CAPATH)
1770 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1771 s.connect(self.server_addr)
1772 cert = s.getpeercert()
1773 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001774
Christian Heimesefff7062013-11-21 03:35:02 +01001775 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001776 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001777 pem = f.read()
1778 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001779 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001780 ctx.verify_mode = ssl.CERT_REQUIRED
1781 ctx.load_verify_locations(cadata=pem)
1782 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1783 s.connect(self.server_addr)
1784 cert = s.getpeercert()
1785 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001786
Martin Panter3840b2a2016-03-27 01:53:46 +00001787 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001788 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001789 ctx.verify_mode = ssl.CERT_REQUIRED
1790 ctx.load_verify_locations(cadata=der)
1791 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1792 s.connect(self.server_addr)
1793 cert = s.getpeercert()
1794 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001795
Antoine Pitroue3220242010-04-24 11:13:53 +00001796 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1797 def test_makefile_close(self):
1798 # Issue #5238: creating a file-like object with makefile() shouldn't
1799 # delay closing the underlying "real socket" (here tested with its
1800 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001801 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001802 ss.connect(self.server_addr)
1803 fd = ss.fileno()
1804 f = ss.makefile()
1805 f.close()
1806 # The fd is still open
1807 os.read(fd, 0)
1808 # Closing the SSL socket should close the fd too
1809 ss.close()
1810 gc.collect()
1811 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001812 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001813 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001814
Antoine Pitrou480a1242010-04-28 21:37:09 +00001815 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001816 s = socket.socket(socket.AF_INET)
1817 s.connect(self.server_addr)
1818 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001819 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001820 cert_reqs=ssl.CERT_NONE,
1821 do_handshake_on_connect=False)
1822 self.addCleanup(s.close)
1823 count = 0
1824 while True:
1825 try:
1826 count += 1
1827 s.do_handshake()
1828 break
1829 except ssl.SSLWantReadError:
1830 select.select([s], [], [])
1831 except ssl.SSLWantWriteError:
1832 select.select([], [s], [])
1833 if support.verbose:
1834 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001835
Antoine Pitrou480a1242010-04-28 21:37:09 +00001836 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001837 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001838
Martin Panter3840b2a2016-03-27 01:53:46 +00001839 def test_get_server_certificate_fail(self):
1840 # Connection failure crashes ThreadedEchoServer, so run this in an
1841 # independent test method
1842 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001843
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001844 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001845 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001846 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1847 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001848 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001849 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1850 s.connect(self.server_addr)
1851 # Error checking can happen at instantiation or when connecting
1852 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1853 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001854 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001855 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1856 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001857
Christian Heimes9a5395a2013-06-17 15:44:12 +02001858 def test_get_ca_certs_capath(self):
1859 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001860 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001861 ctx.load_verify_locations(capath=CAPATH)
1862 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001863 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1864 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001865 s.connect(self.server_addr)
1866 cert = s.getpeercert()
1867 self.assertTrue(cert)
1868 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001869
Christian Heimes575596e2013-12-15 21:49:17 +01001870 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001871 def test_context_setget(self):
1872 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001873 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1874 ctx1.load_verify_locations(capath=CAPATH)
1875 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1876 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001877 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001878 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001879 ss.connect(self.server_addr)
1880 self.assertIs(ss.context, ctx1)
1881 self.assertIs(ss._sslobj.context, ctx1)
1882 ss.context = ctx2
1883 self.assertIs(ss.context, ctx2)
1884 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001885
1886 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1887 # A simple IO loop. Call func(*args) depending on the error we get
1888 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1889 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001890 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001891 count = 0
1892 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001893 if time.monotonic() > deadline:
1894 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001895 errno = None
1896 count += 1
1897 try:
1898 ret = func(*args)
1899 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001900 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001901 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001902 raise
1903 errno = e.errno
1904 # Get any data from the outgoing BIO irrespective of any error, and
1905 # send it to the socket.
1906 buf = outgoing.read()
1907 sock.sendall(buf)
1908 # If there's no error, we're done. For WANT_READ, we need to get
1909 # data from the socket and put it in the incoming BIO.
1910 if errno is None:
1911 break
1912 elif errno == ssl.SSL_ERROR_WANT_READ:
1913 buf = sock.recv(32768)
1914 if buf:
1915 incoming.write(buf)
1916 else:
1917 incoming.write_eof()
1918 if support.verbose:
1919 sys.stdout.write("Needed %d calls to complete %s().\n"
1920 % (count, func.__name__))
1921 return ret
1922
Martin Panter3840b2a2016-03-27 01:53:46 +00001923 def test_bio_handshake(self):
1924 sock = socket.socket(socket.AF_INET)
1925 self.addCleanup(sock.close)
1926 sock.connect(self.server_addr)
1927 incoming = ssl.MemoryBIO()
1928 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001929 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1930 self.assertTrue(ctx.check_hostname)
1931 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001933 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1934 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001935 self.assertIs(sslobj._sslobj.owner, sslobj)
1936 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001937 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001938 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 self.assertRaises(ValueError, sslobj.getpeercert)
1940 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1941 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1942 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1943 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001944 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001945 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 self.assertTrue(sslobj.getpeercert())
1947 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1948 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1949 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001950 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001951 except ssl.SSLSyscallError:
1952 # If the server shuts down the TCP connection without sending a
1953 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1954 pass
1955 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1956
1957 def test_bio_read_write_data(self):
1958 sock = socket.socket(socket.AF_INET)
1959 self.addCleanup(sock.close)
1960 sock.connect(self.server_addr)
1961 incoming = ssl.MemoryBIO()
1962 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001963 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001964 ctx.verify_mode = ssl.CERT_NONE
1965 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1966 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1967 req = b'FOO\n'
1968 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1969 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1970 self.assertEqual(buf, b'foo\n')
1971 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001972
1973
Martin Panter3840b2a2016-03-27 01:53:46 +00001974class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001975
Martin Panter3840b2a2016-03-27 01:53:46 +00001976 def test_timeout_connect_ex(self):
1977 # Issue #12065: on a timeout, connect_ex() should return the original
1978 # errno (mimicking the behaviour of non-SSL sockets).
1979 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02001980 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001981 cert_reqs=ssl.CERT_REQUIRED,
1982 do_handshake_on_connect=False)
1983 self.addCleanup(s.close)
1984 s.settimeout(0.0000001)
1985 rc = s.connect_ex((REMOTE_HOST, 443))
1986 if rc == 0:
1987 self.skipTest("REMOTE_HOST responded too quickly")
1988 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1989
1990 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1991 def test_get_server_certificate_ipv6(self):
1992 with support.transient_internet('ipv6.google.com'):
1993 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1994 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1995
Martin Panter3840b2a2016-03-27 01:53:46 +00001996
1997def _test_get_server_certificate(test, host, port, cert=None):
1998 pem = ssl.get_server_certificate((host, port))
1999 if not pem:
2000 test.fail("No server certificate on %s:%s!" % (host, port))
2001
2002 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2003 if not pem:
2004 test.fail("No server certificate on %s:%s!" % (host, port))
2005 if support.verbose:
2006 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2007
2008def _test_get_server_certificate_fail(test, host, port):
2009 try:
2010 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2011 except ssl.SSLError as x:
2012 #should fail
2013 if support.verbose:
2014 sys.stdout.write("%s\n" % x)
2015 else:
2016 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2017
2018
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002019from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002020
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002021class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002022
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002023 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002024
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002025 """A mildly complicated class, because we want it to work both
2026 with and without the SSL wrapper around the socket connection, so
2027 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002028
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002029 def __init__(self, server, connsock, addr):
2030 self.server = server
2031 self.running = False
2032 self.sock = connsock
2033 self.addr = addr
2034 self.sock.setblocking(1)
2035 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002036 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002037 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002038
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002039 def wrap_conn(self):
2040 try:
2041 self.sslconn = self.server.context.wrap_socket(
2042 self.sock, server_side=True)
2043 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2044 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2045 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2046 # We treat ConnectionResetError as though it were an
2047 # SSLError - OpenSSL on Ubuntu abruptly closes the
2048 # connection when asked to use an unsupported protocol.
2049 #
2050 # OSError may occur with wrong protocols, e.g. both
2051 # sides use PROTOCOL_TLS_SERVER.
2052 #
2053 # XXX Various errors can have happened here, for example
2054 # a mismatching protocol version, an invalid certificate,
2055 # or a low-level bug. This should be made more discriminating.
2056 #
2057 # bpo-31323: Store the exception as string to prevent
2058 # a reference leak: server -> conn_errors -> exception
2059 # -> traceback -> self (ConnectionHandler) -> server
2060 self.server.conn_errors.append(str(e))
2061 if self.server.chatty:
2062 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2063 self.running = False
2064 self.server.stop()
2065 self.close()
2066 return False
2067 else:
2068 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2069 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2070 cert = self.sslconn.getpeercert()
2071 if support.verbose and self.server.chatty:
2072 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2073 cert_binary = self.sslconn.getpeercert(True)
2074 if support.verbose and self.server.chatty:
2075 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2076 cipher = self.sslconn.cipher()
2077 if support.verbose and self.server.chatty:
2078 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2079 sys.stdout.write(" server: selected protocol is now "
2080 + str(self.sslconn.selected_npn_protocol()) + "\n")
2081 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002082
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002083 def read(self):
2084 if self.sslconn:
2085 return self.sslconn.read()
2086 else:
2087 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002088
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002089 def write(self, bytes):
2090 if self.sslconn:
2091 return self.sslconn.write(bytes)
2092 else:
2093 return self.sock.send(bytes)
2094
2095 def close(self):
2096 if self.sslconn:
2097 self.sslconn.close()
2098 else:
2099 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002100
Antoine Pitrou480a1242010-04-28 21:37:09 +00002101 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002102 self.running = True
2103 if not self.server.starttls_server:
2104 if not self.wrap_conn():
2105 return
2106 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002107 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002108 msg = self.read()
2109 stripped = msg.strip()
2110 if not stripped:
2111 # eof, so quit this handler
2112 self.running = False
2113 try:
2114 self.sock = self.sslconn.unwrap()
2115 except OSError:
2116 # Many tests shut the TCP connection down
2117 # without an SSL shutdown. This causes
2118 # unwrap() to raise OSError with errno=0!
2119 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002120 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002121 self.sslconn = None
2122 self.close()
2123 elif stripped == b'over':
2124 if support.verbose and self.server.connectionchatty:
2125 sys.stdout.write(" server: client closed connection\n")
2126 self.close()
2127 return
2128 elif (self.server.starttls_server and
2129 stripped == b'STARTTLS'):
2130 if support.verbose and self.server.connectionchatty:
2131 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2132 self.write(b"OK\n")
2133 if not self.wrap_conn():
2134 return
2135 elif (self.server.starttls_server and self.sslconn
2136 and stripped == b'ENDTLS'):
2137 if support.verbose and self.server.connectionchatty:
2138 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2139 self.write(b"OK\n")
2140 self.sock = self.sslconn.unwrap()
2141 self.sslconn = None
2142 if support.verbose and self.server.connectionchatty:
2143 sys.stdout.write(" server: connection is now unencrypted...\n")
2144 elif stripped == b'CB tls-unique':
2145 if support.verbose and self.server.connectionchatty:
2146 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2147 data = self.sslconn.get_channel_binding("tls-unique")
2148 self.write(repr(data).encode("us-ascii") + b"\n")
2149 else:
2150 if (support.verbose and
2151 self.server.connectionchatty):
2152 ctype = (self.sslconn and "encrypted") or "unencrypted"
2153 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2154 % (msg, ctype, msg.lower(), ctype))
2155 self.write(msg.lower())
2156 except OSError:
2157 if self.server.chatty:
2158 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002159 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002160 self.running = False
2161 # normally, we'd just stop here, but for the test
2162 # harness, we want to stop the server
2163 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002164
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002165 def __init__(self, certificate=None, ssl_version=None,
2166 certreqs=None, cacerts=None,
2167 chatty=True, connectionchatty=False, starttls_server=False,
2168 npn_protocols=None, alpn_protocols=None,
2169 ciphers=None, context=None):
2170 if context:
2171 self.context = context
2172 else:
2173 self.context = ssl.SSLContext(ssl_version
2174 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002175 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002176 self.context.verify_mode = (certreqs if certreqs is not None
2177 else ssl.CERT_NONE)
2178 if cacerts:
2179 self.context.load_verify_locations(cacerts)
2180 if certificate:
2181 self.context.load_cert_chain(certificate)
2182 if npn_protocols:
2183 self.context.set_npn_protocols(npn_protocols)
2184 if alpn_protocols:
2185 self.context.set_alpn_protocols(alpn_protocols)
2186 if ciphers:
2187 self.context.set_ciphers(ciphers)
2188 self.chatty = chatty
2189 self.connectionchatty = connectionchatty
2190 self.starttls_server = starttls_server
2191 self.sock = socket.socket()
2192 self.port = support.bind_port(self.sock)
2193 self.flag = None
2194 self.active = False
2195 self.selected_npn_protocols = []
2196 self.selected_alpn_protocols = []
2197 self.shared_ciphers = []
2198 self.conn_errors = []
2199 threading.Thread.__init__(self)
2200 self.daemon = True
2201
2202 def __enter__(self):
2203 self.start(threading.Event())
2204 self.flag.wait()
2205 return self
2206
2207 def __exit__(self, *args):
2208 self.stop()
2209 self.join()
2210
2211 def start(self, flag=None):
2212 self.flag = flag
2213 threading.Thread.start(self)
2214
2215 def run(self):
2216 self.sock.settimeout(0.05)
2217 self.sock.listen()
2218 self.active = True
2219 if self.flag:
2220 # signal an event
2221 self.flag.set()
2222 while self.active:
2223 try:
2224 newconn, connaddr = self.sock.accept()
2225 if support.verbose and self.chatty:
2226 sys.stdout.write(' server: new connection from '
2227 + repr(connaddr) + '\n')
2228 handler = self.ConnectionHandler(self, newconn, connaddr)
2229 handler.start()
2230 handler.join()
2231 except socket.timeout:
2232 pass
2233 except KeyboardInterrupt:
2234 self.stop()
2235 self.sock.close()
2236
2237 def stop(self):
2238 self.active = False
2239
2240class AsyncoreEchoServer(threading.Thread):
2241
2242 # this one's based on asyncore.dispatcher
2243
2244 class EchoServer (asyncore.dispatcher):
2245
2246 class ConnectionHandler(asyncore.dispatcher_with_send):
2247
2248 def __init__(self, conn, certfile):
2249 self.socket = test_wrap_socket(conn, server_side=True,
2250 certfile=certfile,
2251 do_handshake_on_connect=False)
2252 asyncore.dispatcher_with_send.__init__(self, self.socket)
2253 self._ssl_accepting = True
2254 self._do_ssl_handshake()
2255
2256 def readable(self):
2257 if isinstance(self.socket, ssl.SSLSocket):
2258 while self.socket.pending() > 0:
2259 self.handle_read_event()
2260 return True
2261
2262 def _do_ssl_handshake(self):
2263 try:
2264 self.socket.do_handshake()
2265 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2266 return
2267 except ssl.SSLEOFError:
2268 return self.handle_close()
2269 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002270 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002271 except OSError as err:
2272 if err.args[0] == errno.ECONNABORTED:
2273 return self.handle_close()
2274 else:
2275 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002276
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002277 def handle_read(self):
2278 if self._ssl_accepting:
2279 self._do_ssl_handshake()
2280 else:
2281 data = self.recv(1024)
2282 if support.verbose:
2283 sys.stdout.write(" server: read %s from client\n" % repr(data))
2284 if not data:
2285 self.close()
2286 else:
2287 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002288
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002289 def handle_close(self):
2290 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002291 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002292 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002293
2294 def handle_error(self):
2295 raise
2296
Trent Nelson78520002008-04-10 20:54:35 +00002297 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002298 self.certfile = certfile
2299 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2300 self.port = support.bind_port(sock, '')
2301 asyncore.dispatcher.__init__(self, sock)
2302 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002303
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002304 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002305 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002306 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2307 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002308
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002309 def handle_error(self):
2310 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002311
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002312 def __init__(self, certfile):
2313 self.flag = None
2314 self.active = False
2315 self.server = self.EchoServer(certfile)
2316 self.port = self.server.port
2317 threading.Thread.__init__(self)
2318 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002319
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002320 def __str__(self):
2321 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002322
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002323 def __enter__(self):
2324 self.start(threading.Event())
2325 self.flag.wait()
2326 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002327
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002328 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002329 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002330 sys.stdout.write(" cleanup: stopping server.\n")
2331 self.stop()
2332 if support.verbose:
2333 sys.stdout.write(" cleanup: joining server thread.\n")
2334 self.join()
2335 if support.verbose:
2336 sys.stdout.write(" cleanup: successfully joined.\n")
2337 # make sure that ConnectionHandler is removed from socket_map
2338 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002339
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002340 def start (self, flag=None):
2341 self.flag = flag
2342 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002343
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002344 def run(self):
2345 self.active = True
2346 if self.flag:
2347 self.flag.set()
2348 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002349 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002350 asyncore.loop(1)
2351 except:
2352 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002353
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002354 def stop(self):
2355 self.active = False
2356 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002357
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002358def server_params_test(client_context, server_context, indata=b"FOO\n",
2359 chatty=True, connectionchatty=False, sni_name=None,
2360 session=None):
2361 """
2362 Launch a server, connect a client to it and try various reads
2363 and writes.
2364 """
2365 stats = {}
2366 server = ThreadedEchoServer(context=server_context,
2367 chatty=chatty,
2368 connectionchatty=False)
2369 with server:
2370 with client_context.wrap_socket(socket.socket(),
2371 server_hostname=sni_name, session=session) as s:
2372 s.connect((HOST, server.port))
2373 for arg in [indata, bytearray(indata), memoryview(indata)]:
2374 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002375 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002376 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002377 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002378 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002379 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002380 if connectionchatty:
2381 if support.verbose:
2382 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002383 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002384 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002385 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2386 % (outdata[:20], len(outdata),
2387 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002388 s.write(b"over\n")
2389 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002390 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002391 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002392 stats.update({
2393 'compression': s.compression(),
2394 'cipher': s.cipher(),
2395 'peercert': s.getpeercert(),
2396 'client_alpn_protocol': s.selected_alpn_protocol(),
2397 'client_npn_protocol': s.selected_npn_protocol(),
2398 'version': s.version(),
2399 'session_reused': s.session_reused,
2400 'session': s.session,
2401 })
2402 s.close()
2403 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2404 stats['server_npn_protocols'] = server.selected_npn_protocols
2405 stats['server_shared_ciphers'] = server.shared_ciphers
2406 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002407
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002408def try_protocol_combo(server_protocol, client_protocol, expect_success,
2409 certsreqs=None, server_options=0, client_options=0):
2410 """
2411 Try to SSL-connect using *client_protocol* to *server_protocol*.
2412 If *expect_success* is true, assert that the connection succeeds,
2413 if it's false, assert that the connection fails.
2414 Also, if *expect_success* is a string, assert that it is the protocol
2415 version actually used by the connection.
2416 """
2417 if certsreqs is None:
2418 certsreqs = ssl.CERT_NONE
2419 certtype = {
2420 ssl.CERT_NONE: "CERT_NONE",
2421 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2422 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2423 }[certsreqs]
2424 if support.verbose:
2425 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2426 sys.stdout.write(formatstr %
2427 (ssl.get_protocol_name(client_protocol),
2428 ssl.get_protocol_name(server_protocol),
2429 certtype))
2430 client_context = ssl.SSLContext(client_protocol)
2431 client_context.options |= client_options
2432 server_context = ssl.SSLContext(server_protocol)
2433 server_context.options |= server_options
2434
2435 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2436 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2437 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002438 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002439 client_context.set_ciphers("ALL")
2440
2441 for ctx in (client_context, server_context):
2442 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002443 ctx.load_cert_chain(SIGNED_CERTFILE)
2444 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002445 try:
2446 stats = server_params_test(client_context, server_context,
2447 chatty=False, connectionchatty=False)
2448 # Protocol mismatch can result in either an SSLError, or a
2449 # "Connection reset by peer" error.
2450 except ssl.SSLError:
2451 if expect_success:
2452 raise
2453 except OSError as e:
2454 if expect_success or e.errno != errno.ECONNRESET:
2455 raise
2456 else:
2457 if not expect_success:
2458 raise AssertionError(
2459 "Client protocol %s succeeded with server protocol %s!"
2460 % (ssl.get_protocol_name(client_protocol),
2461 ssl.get_protocol_name(server_protocol)))
2462 elif (expect_success is not True
2463 and expect_success != stats['version']):
2464 raise AssertionError("version mismatch: expected %r, got %r"
2465 % (expect_success, stats['version']))
2466
2467
2468class ThreadedTests(unittest.TestCase):
2469
2470 @skip_if_broken_ubuntu_ssl
2471 def test_echo(self):
2472 """Basic test of an SSL client connecting to a server"""
2473 if support.verbose:
2474 sys.stdout.write("\n")
2475 for protocol in PROTOCOLS:
2476 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2477 continue
2478 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2479 context = ssl.SSLContext(protocol)
2480 context.load_cert_chain(CERTFILE)
2481 server_params_test(context, context,
2482 chatty=True, connectionchatty=True)
2483
Christian Heimesa170fa12017-09-15 20:27:30 +02002484 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002485
2486 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2487 server_params_test(client_context=client_context,
2488 server_context=server_context,
2489 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002490 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002491
2492 client_context.check_hostname = False
2493 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2494 with self.assertRaises(ssl.SSLError) as e:
2495 server_params_test(client_context=server_context,
2496 server_context=client_context,
2497 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002498 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002499 self.assertIn('called a function you should not call',
2500 str(e.exception))
2501
2502 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2503 with self.assertRaises(ssl.SSLError) as e:
2504 server_params_test(client_context=server_context,
2505 server_context=server_context,
2506 chatty=True, connectionchatty=True)
2507 self.assertIn('called a function you should not call',
2508 str(e.exception))
2509
2510 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2511 with self.assertRaises(ssl.SSLError) as e:
2512 server_params_test(client_context=server_context,
2513 server_context=client_context,
2514 chatty=True, connectionchatty=True)
2515 self.assertIn('called a function you should not call',
2516 str(e.exception))
2517
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002518 def test_getpeercert(self):
2519 if support.verbose:
2520 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002521
2522 client_context, server_context, hostname = testing_context()
2523 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002524 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002525 with client_context.wrap_socket(socket.socket(),
2526 do_handshake_on_connect=False,
2527 server_hostname=hostname) as s:
2528 s.connect((HOST, server.port))
2529 # getpeercert() raise ValueError while the handshake isn't
2530 # done.
2531 with self.assertRaises(ValueError):
2532 s.getpeercert()
2533 s.do_handshake()
2534 cert = s.getpeercert()
2535 self.assertTrue(cert, "Can't get peer certificate.")
2536 cipher = s.cipher()
2537 if support.verbose:
2538 sys.stdout.write(pprint.pformat(cert) + '\n')
2539 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2540 if 'subject' not in cert:
2541 self.fail("No subject field in certificate: %s." %
2542 pprint.pformat(cert))
2543 if ((('organizationName', 'Python Software Foundation'),)
2544 not in cert['subject']):
2545 self.fail(
2546 "Missing or invalid 'organizationName' field in certificate subject; "
2547 "should be 'Python Software Foundation'.")
2548 self.assertIn('notBefore', cert)
2549 self.assertIn('notAfter', cert)
2550 before = ssl.cert_time_to_seconds(cert['notBefore'])
2551 after = ssl.cert_time_to_seconds(cert['notAfter'])
2552 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002553
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002554 @unittest.skipUnless(have_verify_flags(),
2555 "verify_flags need OpenSSL > 0.9.8")
2556 def test_crl_check(self):
2557 if support.verbose:
2558 sys.stdout.write("\n")
2559
Christian Heimesa170fa12017-09-15 20:27:30 +02002560 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002561
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002562 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002563 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002564
2565 # VERIFY_DEFAULT should pass
2566 server = ThreadedEchoServer(context=server_context, chatty=True)
2567 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002568 with client_context.wrap_socket(socket.socket(),
2569 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002570 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002571 cert = s.getpeercert()
2572 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002573
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002574 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002575 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002576
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002577 server = ThreadedEchoServer(context=server_context, chatty=True)
2578 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002579 with client_context.wrap_socket(socket.socket(),
2580 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002581 with self.assertRaisesRegex(ssl.SSLError,
2582 "certificate verify failed"):
2583 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002584
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002585 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002586 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002587
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002588 server = ThreadedEchoServer(context=server_context, chatty=True)
2589 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002590 with client_context.wrap_socket(socket.socket(),
2591 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002592 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002593 cert = s.getpeercert()
2594 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002595
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002596 def test_check_hostname(self):
2597 if support.verbose:
2598 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002599
Christian Heimesa170fa12017-09-15 20:27:30 +02002600 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002601
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602 # correct hostname should verify
2603 server = ThreadedEchoServer(context=server_context, chatty=True)
2604 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002605 with client_context.wrap_socket(socket.socket(),
2606 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002607 s.connect((HOST, server.port))
2608 cert = s.getpeercert()
2609 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002610
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002611 # incorrect hostname should raise an exception
2612 server = ThreadedEchoServer(context=server_context, chatty=True)
2613 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002614 with client_context.wrap_socket(socket.socket(),
2615 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002616 with self.assertRaisesRegex(
2617 ssl.CertificateError,
2618 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002619 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002620
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002621 # missing server_hostname arg should cause an exception, too
2622 server = ThreadedEchoServer(context=server_context, chatty=True)
2623 with server:
2624 with socket.socket() as s:
2625 with self.assertRaisesRegex(ValueError,
2626 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002627 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002628
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002629 def test_ecc_cert(self):
2630 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2631 client_context.load_verify_locations(SIGNING_CA)
2632 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2633 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2634
2635 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2636 # load ECC cert
2637 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2638
2639 # correct hostname should verify
2640 server = ThreadedEchoServer(context=server_context, chatty=True)
2641 with server:
2642 with client_context.wrap_socket(socket.socket(),
2643 server_hostname=hostname) as s:
2644 s.connect((HOST, server.port))
2645 cert = s.getpeercert()
2646 self.assertTrue(cert, "Can't get peer certificate.")
2647 cipher = s.cipher()[0].split('-')
2648 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2649
2650 def test_dual_rsa_ecc(self):
2651 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2652 client_context.load_verify_locations(SIGNING_CA)
2653 # only ECDSA certs
2654 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2655 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2656
2657 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2658 # load ECC and RSA key/cert pairs
2659 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2660 server_context.load_cert_chain(SIGNED_CERTFILE)
2661
2662 # correct hostname should verify
2663 server = ThreadedEchoServer(context=server_context, chatty=True)
2664 with server:
2665 with client_context.wrap_socket(socket.socket(),
2666 server_hostname=hostname) as s:
2667 s.connect((HOST, server.port))
2668 cert = s.getpeercert()
2669 self.assertTrue(cert, "Can't get peer certificate.")
2670 cipher = s.cipher()[0].split('-')
2671 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2672
Christian Heimes66e57422018-01-29 14:25:13 +01002673 def test_check_hostname_idn(self):
2674 if support.verbose:
2675 sys.stdout.write("\n")
2676
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002677 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002678 server_context.load_cert_chain(IDNSANSFILE)
2679
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002680 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002681 context.verify_mode = ssl.CERT_REQUIRED
2682 context.check_hostname = True
2683 context.load_verify_locations(SIGNING_CA)
2684
2685 # correct hostname should verify, when specified in several
2686 # different ways
2687 idn_hostnames = [
2688 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002689 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002690 ('xn--knig-5qa.idn.pythontest.net',
2691 'xn--knig-5qa.idn.pythontest.net'),
2692 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002693 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002694
2695 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002696 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002697 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2698 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2699 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002700 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2701
2702 # ('königsgäßchen.idna2008.pythontest.net',
2703 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2704 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2705 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2706 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2707 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2708
Christian Heimes66e57422018-01-29 14:25:13 +01002709 ]
2710 for server_hostname, expected_hostname in idn_hostnames:
2711 server = ThreadedEchoServer(context=server_context, chatty=True)
2712 with server:
2713 with context.wrap_socket(socket.socket(),
2714 server_hostname=server_hostname) as s:
2715 self.assertEqual(s.server_hostname, expected_hostname)
2716 s.connect((HOST, server.port))
2717 cert = s.getpeercert()
2718 self.assertEqual(s.server_hostname, expected_hostname)
2719 self.assertTrue(cert, "Can't get peer certificate.")
2720
2721 with ssl.SSLSocket(socket.socket(),
2722 server_hostname=server_hostname) as s:
2723 s.connect((HOST, server.port))
2724 s.getpeercert()
2725 self.assertEqual(s.server_hostname, expected_hostname)
2726
Christian Heimes66e57422018-01-29 14:25:13 +01002727 # incorrect hostname should raise an exception
2728 server = ThreadedEchoServer(context=server_context, chatty=True)
2729 with server:
2730 with context.wrap_socket(socket.socket(),
2731 server_hostname="python.example.org") as s:
2732 with self.assertRaises(ssl.CertificateError):
2733 s.connect((HOST, server.port))
2734
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002735 def test_wrong_cert(self):
2736 """Connecting when the server rejects the client's certificate
2737
2738 Launch a server with CERT_REQUIRED, and check that trying to
2739 connect to it with a wrong client certificate fails.
2740 """
2741 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2742 "wrongcert.pem")
2743 server = ThreadedEchoServer(CERTFILE,
2744 certreqs=ssl.CERT_REQUIRED,
2745 cacerts=CERTFILE, chatty=False,
2746 connectionchatty=False)
2747 with server, \
2748 socket.socket() as sock, \
Christian Heimesa170fa12017-09-15 20:27:30 +02002749 test_wrap_socket(sock, certfile=certfile) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002750 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002751 # Expect either an SSL error about the server rejecting
2752 # the connection, or a low-level connection reset (which
2753 # sometimes happens on Windows)
2754 s.connect((HOST, server.port))
2755 except ssl.SSLError as e:
2756 if support.verbose:
2757 sys.stdout.write("\nSSLError is %r\n" % e)
2758 except OSError as e:
2759 if e.errno != errno.ECONNRESET:
2760 raise
2761 if support.verbose:
2762 sys.stdout.write("\nsocket.error is %r\n" % e)
2763 else:
2764 self.fail("Use of invalid cert should have failed!")
2765
2766 def test_rude_shutdown(self):
2767 """A brutal shutdown of an SSL server should raise an OSError
2768 in the client when attempting handshake.
2769 """
2770 listener_ready = threading.Event()
2771 listener_gone = threading.Event()
2772
2773 s = socket.socket()
2774 port = support.bind_port(s, HOST)
2775
2776 # `listener` runs in a thread. It sits in an accept() until
2777 # the main thread connects. Then it rudely closes the socket,
2778 # and sets Event `listener_gone` to let the main thread know
2779 # the socket is gone.
2780 def listener():
2781 s.listen()
2782 listener_ready.set()
2783 newsock, addr = s.accept()
2784 newsock.close()
2785 s.close()
2786 listener_gone.set()
2787
2788 def connector():
2789 listener_ready.wait()
2790 with socket.socket() as c:
2791 c.connect((HOST, port))
2792 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002793 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002794 ssl_sock = test_wrap_socket(c)
2795 except OSError:
2796 pass
2797 else:
2798 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002799
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002800 t = threading.Thread(target=listener)
2801 t.start()
2802 try:
2803 connector()
2804 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002805 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002806
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002807 def test_ssl_cert_verify_error(self):
2808 if support.verbose:
2809 sys.stdout.write("\n")
2810
2811 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2812 server_context.load_cert_chain(SIGNED_CERTFILE)
2813
2814 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2815
2816 server = ThreadedEchoServer(context=server_context, chatty=True)
2817 with server:
2818 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002819 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002820 try:
2821 s.connect((HOST, server.port))
2822 except ssl.SSLError as e:
2823 msg = 'unable to get local issuer certificate'
2824 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2825 self.assertEqual(e.verify_code, 20)
2826 self.assertEqual(e.verify_message, msg)
2827 self.assertIn(msg, repr(e))
2828 self.assertIn('certificate verify failed', repr(e))
2829
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002830 @skip_if_broken_ubuntu_ssl
2831 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2832 "OpenSSL is compiled without SSLv2 support")
2833 def test_protocol_sslv2(self):
2834 """Connecting to an SSLv2 server with various client options"""
2835 if support.verbose:
2836 sys.stdout.write("\n")
2837 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2838 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2839 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002840 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002841 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2842 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2843 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2844 # SSLv23 client with specific SSL options
2845 if no_sslv2_implies_sslv3_hello():
2846 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002847 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002848 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002849 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002850 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002851 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002852 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002853
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002854 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002855 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002856 """Connecting to an SSLv23 server with various client options"""
2857 if support.verbose:
2858 sys.stdout.write("\n")
2859 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002860 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002861 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002862 except OSError as x:
2863 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2864 if support.verbose:
2865 sys.stdout.write(
2866 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2867 % str(x))
2868 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002869 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2870 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2871 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002872
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002873 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002874 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2875 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2876 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002877
2878 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002879 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2880 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2881 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002882
2883 # Server with specific SSL options
2884 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002885 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002886 server_options=ssl.OP_NO_SSLv3)
2887 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002888 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002889 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002890 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002891 server_options=ssl.OP_NO_TLSv1)
2892
2893
2894 @skip_if_broken_ubuntu_ssl
2895 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2896 "OpenSSL is compiled without SSLv3 support")
2897 def test_protocol_sslv3(self):
2898 """Connecting to an SSLv3 server with various client options"""
2899 if support.verbose:
2900 sys.stdout.write("\n")
2901 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2902 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2903 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2904 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2905 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002906 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002907 client_options=ssl.OP_NO_SSLv3)
2908 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2909 if no_sslv2_implies_sslv3_hello():
2910 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002911 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002912 False, client_options=ssl.OP_NO_SSLv2)
2913
2914 @skip_if_broken_ubuntu_ssl
2915 def test_protocol_tlsv1(self):
2916 """Connecting to a TLSv1 server with various client options"""
2917 if support.verbose:
2918 sys.stdout.write("\n")
2919 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2920 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2921 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2922 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2923 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2924 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2925 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002926 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002927 client_options=ssl.OP_NO_TLSv1)
2928
2929 @skip_if_broken_ubuntu_ssl
2930 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2931 "TLS version 1.1 not supported.")
2932 def test_protocol_tlsv1_1(self):
2933 """Connecting to a TLSv1.1 server with various client options.
2934 Testing against older TLS versions."""
2935 if support.verbose:
2936 sys.stdout.write("\n")
2937 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2938 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2939 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2940 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2941 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002942 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002943 client_options=ssl.OP_NO_TLSv1_1)
2944
Christian Heimesa170fa12017-09-15 20:27:30 +02002945 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002946 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2947 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2948
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002949 @skip_if_broken_ubuntu_ssl
2950 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2951 "TLS version 1.2 not supported.")
2952 def test_protocol_tlsv1_2(self):
2953 """Connecting to a TLSv1.2 server with various client options.
2954 Testing against older TLS versions."""
2955 if support.verbose:
2956 sys.stdout.write("\n")
2957 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2958 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2959 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2960 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2961 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2962 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2963 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002964 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002965 client_options=ssl.OP_NO_TLSv1_2)
2966
Christian Heimesa170fa12017-09-15 20:27:30 +02002967 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002968 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2969 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2970 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2971 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2972
2973 def test_starttls(self):
2974 """Switching from clear text to encrypted and back again."""
2975 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2976
2977 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002978 starttls_server=True,
2979 chatty=True,
2980 connectionchatty=True)
2981 wrapped = False
2982 with server:
2983 s = socket.socket()
2984 s.setblocking(1)
2985 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002986 if support.verbose:
2987 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002988 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02002989 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002990 sys.stdout.write(
2991 " client: sending %r...\n" % indata)
2992 if wrapped:
2993 conn.write(indata)
2994 outdata = conn.read()
2995 else:
2996 s.send(indata)
2997 outdata = s.recv(1024)
2998 msg = outdata.strip().lower()
2999 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3000 # STARTTLS ok, switch to secure mode
3001 if support.verbose:
3002 sys.stdout.write(
3003 " client: read %r from server, starting TLS...\n"
3004 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003005 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003006 wrapped = True
3007 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3008 # ENDTLS ok, switch back to clear text
3009 if support.verbose:
3010 sys.stdout.write(
3011 " client: read %r from server, ending TLS...\n"
3012 % msg)
3013 s = conn.unwrap()
3014 wrapped = False
3015 else:
3016 if support.verbose:
3017 sys.stdout.write(
3018 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003019 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003020 sys.stdout.write(" client: closing connection.\n")
3021 if wrapped:
3022 conn.write(b"over\n")
3023 else:
3024 s.send(b"over\n")
3025 if wrapped:
3026 conn.close()
3027 else:
3028 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003029
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003030 def test_socketserver(self):
3031 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003032 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003033 # try to connect
3034 if support.verbose:
3035 sys.stdout.write('\n')
3036 with open(CERTFILE, 'rb') as f:
3037 d1 = f.read()
3038 d2 = ''
3039 # now fetch the same data from the HTTPS server
3040 url = 'https://localhost:%d/%s' % (
3041 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003042 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003043 f = urllib.request.urlopen(url, context=context)
3044 try:
3045 dlen = f.info().get("content-length")
3046 if dlen and (int(dlen) > 0):
3047 d2 = f.read(int(dlen))
3048 if support.verbose:
3049 sys.stdout.write(
3050 " client: read %d bytes from remote server '%s'\n"
3051 % (len(d2), server))
3052 finally:
3053 f.close()
3054 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003055
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003056 def test_asyncore_server(self):
3057 """Check the example asyncore integration."""
3058 if support.verbose:
3059 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003060
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003061 indata = b"FOO\n"
3062 server = AsyncoreEchoServer(CERTFILE)
3063 with server:
3064 s = test_wrap_socket(socket.socket())
3065 s.connect(('127.0.0.1', server.port))
3066 if support.verbose:
3067 sys.stdout.write(
3068 " client: sending %r...\n" % indata)
3069 s.write(indata)
3070 outdata = s.read()
3071 if support.verbose:
3072 sys.stdout.write(" client: read %r\n" % outdata)
3073 if outdata != indata.lower():
3074 self.fail(
3075 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3076 % (outdata[:20], len(outdata),
3077 indata[:20].lower(), len(indata)))
3078 s.write(b"over\n")
3079 if support.verbose:
3080 sys.stdout.write(" client: closing connection.\n")
3081 s.close()
3082 if support.verbose:
3083 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003084
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085 def test_recv_send(self):
3086 """Test recv(), send() and friends."""
3087 if support.verbose:
3088 sys.stdout.write("\n")
3089
3090 server = ThreadedEchoServer(CERTFILE,
3091 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003092 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 cacerts=CERTFILE,
3094 chatty=True,
3095 connectionchatty=False)
3096 with server:
3097 s = test_wrap_socket(socket.socket(),
3098 server_side=False,
3099 certfile=CERTFILE,
3100 ca_certs=CERTFILE,
3101 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003102 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003103 s.connect((HOST, server.port))
3104 # helper methods for standardising recv* method signatures
3105 def _recv_into():
3106 b = bytearray(b"\0"*100)
3107 count = s.recv_into(b)
3108 return b[:count]
3109
3110 def _recvfrom_into():
3111 b = bytearray(b"\0"*100)
3112 count, addr = s.recvfrom_into(b)
3113 return b[:count]
3114
3115 # (name, method, expect success?, *args, return value func)
3116 send_methods = [
3117 ('send', s.send, True, [], len),
3118 ('sendto', s.sendto, False, ["some.address"], len),
3119 ('sendall', s.sendall, True, [], lambda x: None),
3120 ]
3121 # (name, method, whether to expect success, *args)
3122 recv_methods = [
3123 ('recv', s.recv, True, []),
3124 ('recvfrom', s.recvfrom, False, ["some.address"]),
3125 ('recv_into', _recv_into, True, []),
3126 ('recvfrom_into', _recvfrom_into, False, []),
3127 ]
3128 data_prefix = "PREFIX_"
3129
3130 for (meth_name, send_meth, expect_success, args,
3131 ret_val_meth) in send_methods:
3132 indata = (data_prefix + meth_name).encode('ascii')
3133 try:
3134 ret = send_meth(indata, *args)
3135 msg = "sending with {}".format(meth_name)
3136 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3137 outdata = s.read()
3138 if outdata != indata.lower():
3139 self.fail(
3140 "While sending with <<{name:s}>> bad data "
3141 "<<{outdata:r}>> ({nout:d}) received; "
3142 "expected <<{indata:r}>> ({nin:d})\n".format(
3143 name=meth_name, outdata=outdata[:20],
3144 nout=len(outdata),
3145 indata=indata[:20], nin=len(indata)
3146 )
3147 )
3148 except ValueError as e:
3149 if expect_success:
3150 self.fail(
3151 "Failed to send with method <<{name:s}>>; "
3152 "expected to succeed.\n".format(name=meth_name)
3153 )
3154 if not str(e).startswith(meth_name):
3155 self.fail(
3156 "Method <<{name:s}>> failed with unexpected "
3157 "exception message: {exp:s}\n".format(
3158 name=meth_name, exp=e
3159 )
3160 )
3161
3162 for meth_name, recv_meth, expect_success, args in recv_methods:
3163 indata = (data_prefix + meth_name).encode('ascii')
3164 try:
3165 s.send(indata)
3166 outdata = recv_meth(*args)
3167 if outdata != indata.lower():
3168 self.fail(
3169 "While receiving with <<{name:s}>> bad data "
3170 "<<{outdata:r}>> ({nout:d}) received; "
3171 "expected <<{indata:r}>> ({nin:d})\n".format(
3172 name=meth_name, outdata=outdata[:20],
3173 nout=len(outdata),
3174 indata=indata[:20], nin=len(indata)
3175 )
3176 )
3177 except ValueError as e:
3178 if expect_success:
3179 self.fail(
3180 "Failed to receive with method <<{name:s}>>; "
3181 "expected to succeed.\n".format(name=meth_name)
3182 )
3183 if not str(e).startswith(meth_name):
3184 self.fail(
3185 "Method <<{name:s}>> failed with unexpected "
3186 "exception message: {exp:s}\n".format(
3187 name=meth_name, exp=e
3188 )
3189 )
3190 # consume data
3191 s.read()
3192
3193 # read(-1, buffer) is supported, even though read(-1) is not
3194 data = b"data"
3195 s.send(data)
3196 buffer = bytearray(len(data))
3197 self.assertEqual(s.read(-1, buffer), len(data))
3198 self.assertEqual(buffer, data)
3199
Christian Heimes888bbdc2017-09-07 14:18:21 -07003200 # sendall accepts bytes-like objects
3201 if ctypes is not None:
3202 ubyte = ctypes.c_ubyte * len(data)
3203 byteslike = ubyte.from_buffer_copy(data)
3204 s.sendall(byteslike)
3205 self.assertEqual(s.read(), data)
3206
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003207 # Make sure sendmsg et al are disallowed to avoid
3208 # inadvertent disclosure of data and/or corruption
3209 # of the encrypted data stream
3210 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3211 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3212 self.assertRaises(NotImplementedError,
3213 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003214 s.write(b"over\n")
3215
3216 self.assertRaises(ValueError, s.recv, -1)
3217 self.assertRaises(ValueError, s.read, -1)
3218
3219 s.close()
3220
3221 def test_recv_zero(self):
3222 server = ThreadedEchoServer(CERTFILE)
3223 server.__enter__()
3224 self.addCleanup(server.__exit__, None, None)
3225 s = socket.create_connection((HOST, server.port))
3226 self.addCleanup(s.close)
3227 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3228 self.addCleanup(s.close)
3229
3230 # recv/read(0) should return no data
3231 s.send(b"data")
3232 self.assertEqual(s.recv(0), b"")
3233 self.assertEqual(s.read(0), b"")
3234 self.assertEqual(s.read(), b"data")
3235
3236 # Should not block if the other end sends no data
3237 s.setblocking(False)
3238 self.assertEqual(s.recv(0), b"")
3239 self.assertEqual(s.recv_into(bytearray()), 0)
3240
3241 def test_nonblocking_send(self):
3242 server = ThreadedEchoServer(CERTFILE,
3243 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003244 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003245 cacerts=CERTFILE,
3246 chatty=True,
3247 connectionchatty=False)
3248 with server:
3249 s = test_wrap_socket(socket.socket(),
3250 server_side=False,
3251 certfile=CERTFILE,
3252 ca_certs=CERTFILE,
3253 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003254 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003255 s.connect((HOST, server.port))
3256 s.setblocking(False)
3257
3258 # If we keep sending data, at some point the buffers
3259 # will be full and the call will block
3260 buf = bytearray(8192)
3261 def fill_buffer():
3262 while True:
3263 s.send(buf)
3264 self.assertRaises((ssl.SSLWantWriteError,
3265 ssl.SSLWantReadError), fill_buffer)
3266
3267 # Now read all the output and discard it
3268 s.setblocking(True)
3269 s.close()
3270
3271 def test_handshake_timeout(self):
3272 # Issue #5103: SSL handshake must respect the socket timeout
3273 server = socket.socket(socket.AF_INET)
3274 host = "127.0.0.1"
3275 port = support.bind_port(server)
3276 started = threading.Event()
3277 finish = False
3278
3279 def serve():
3280 server.listen()
3281 started.set()
3282 conns = []
3283 while not finish:
3284 r, w, e = select.select([server], [], [], 0.1)
3285 if server in r:
3286 # Let the socket hang around rather than having
3287 # it closed by garbage collection.
3288 conns.append(server.accept()[0])
3289 for sock in conns:
3290 sock.close()
3291
3292 t = threading.Thread(target=serve)
3293 t.start()
3294 started.wait()
3295
3296 try:
3297 try:
3298 c = socket.socket(socket.AF_INET)
3299 c.settimeout(0.2)
3300 c.connect((host, port))
3301 # Will attempt handshake and time out
3302 self.assertRaisesRegex(socket.timeout, "timed out",
3303 test_wrap_socket, c)
3304 finally:
3305 c.close()
3306 try:
3307 c = socket.socket(socket.AF_INET)
3308 c = test_wrap_socket(c)
3309 c.settimeout(0.2)
3310 # Will attempt handshake and time out
3311 self.assertRaisesRegex(socket.timeout, "timed out",
3312 c.connect, (host, port))
3313 finally:
3314 c.close()
3315 finally:
3316 finish = True
3317 t.join()
3318 server.close()
3319
3320 def test_server_accept(self):
3321 # Issue #16357: accept() on a SSLSocket created through
3322 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003323 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003324 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003325 context.load_verify_locations(SIGNING_CA)
3326 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003327 server = socket.socket(socket.AF_INET)
3328 host = "127.0.0.1"
3329 port = support.bind_port(server)
3330 server = context.wrap_socket(server, server_side=True)
3331 self.assertTrue(server.server_side)
3332
3333 evt = threading.Event()
3334 remote = None
3335 peer = None
3336 def serve():
3337 nonlocal remote, peer
3338 server.listen()
3339 # Block on the accept and wait on the connection to close.
3340 evt.set()
3341 remote, peer = server.accept()
3342 remote.recv(1)
3343
3344 t = threading.Thread(target=serve)
3345 t.start()
3346 # Client wait until server setup and perform a connect.
3347 evt.wait()
3348 client = context.wrap_socket(socket.socket())
3349 client.connect((host, port))
3350 client_addr = client.getsockname()
3351 client.close()
3352 t.join()
3353 remote.close()
3354 server.close()
3355 # Sanity checks.
3356 self.assertIsInstance(remote, ssl.SSLSocket)
3357 self.assertEqual(peer, client_addr)
3358
3359 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003360 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003361 with context.wrap_socket(socket.socket()) as sock:
3362 with self.assertRaises(OSError) as cm:
3363 sock.getpeercert()
3364 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3365
3366 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003367 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003368 with context.wrap_socket(socket.socket()) as sock:
3369 with self.assertRaises(OSError) as cm:
3370 sock.do_handshake()
3371 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3372
3373 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003374 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003375 try:
3376 # Force a set of weak ciphers on our client context
3377 context.set_ciphers("DES")
3378 except ssl.SSLError:
3379 self.skipTest("no DES cipher available")
3380 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003381 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003382 chatty=False) as server:
3383 with context.wrap_socket(socket.socket()) as s:
3384 with self.assertRaises(OSError):
3385 s.connect((HOST, server.port))
3386 self.assertIn("no shared cipher", server.conn_errors[0])
3387
3388 def test_version_basic(self):
3389 """
3390 Basic tests for SSLSocket.version().
3391 More tests are done in the test_protocol_*() methods.
3392 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003393 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3394 context.check_hostname = False
3395 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003396 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003397 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003398 chatty=False) as server:
3399 with context.wrap_socket(socket.socket()) as s:
3400 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003401 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003402 s.connect((HOST, server.port))
Christian Heimesa170fa12017-09-15 20:27:30 +02003403 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3404 self.assertEqual(s.version(), 'TLSv1.2')
3405 else: # 0.9.8 to 1.0.1
3406 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003407 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003408 self.assertIs(s.version(), None)
3409
Christian Heimescb5b68a2017-09-07 18:07:00 -07003410 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3411 "test requires TLSv1.3 enabled OpenSSL")
3412 def test_tls1_3(self):
3413 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3414 context.load_cert_chain(CERTFILE)
3415 # disable all but TLS 1.3
3416 context.options |= (
3417 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3418 )
3419 with ThreadedEchoServer(context=context) as server:
3420 with context.wrap_socket(socket.socket()) as s:
3421 s.connect((HOST, server.port))
3422 self.assertIn(s.cipher()[0], [
3423 'TLS13-AES-256-GCM-SHA384',
3424 'TLS13-CHACHA20-POLY1305-SHA256',
3425 'TLS13-AES-128-GCM-SHA256',
3426 ])
3427
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003428 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3429 def test_default_ecdh_curve(self):
3430 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3431 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003432 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003433 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003434 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3435 # cipher name.
3436 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003437 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3438 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3439 # our default cipher list should prefer ECDH-based ciphers
3440 # automatically.
3441 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3442 context.set_ciphers("ECCdraft:ECDH")
3443 with ThreadedEchoServer(context=context) as server:
3444 with context.wrap_socket(socket.socket()) as s:
3445 s.connect((HOST, server.port))
3446 self.assertIn("ECDH", s.cipher()[0])
3447
3448 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3449 "'tls-unique' channel binding not available")
3450 def test_tls_unique_channel_binding(self):
3451 """Test tls-unique channel binding."""
3452 if support.verbose:
3453 sys.stdout.write("\n")
3454
3455 server = ThreadedEchoServer(CERTFILE,
3456 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003457 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003458 cacerts=CERTFILE,
3459 chatty=True,
3460 connectionchatty=False)
3461 with server:
3462 s = test_wrap_socket(socket.socket(),
3463 server_side=False,
3464 certfile=CERTFILE,
3465 ca_certs=CERTFILE,
3466 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003467 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003468 s.connect((HOST, server.port))
3469 # get the data
3470 cb_data = s.get_channel_binding("tls-unique")
3471 if support.verbose:
3472 sys.stdout.write(" got channel binding data: {0!r}\n"
3473 .format(cb_data))
3474
3475 # check if it is sane
3476 self.assertIsNotNone(cb_data)
3477 self.assertEqual(len(cb_data), 12) # True for TLSv1
3478
3479 # and compare with the peers version
3480 s.write(b"CB tls-unique\n")
3481 peer_data_repr = s.read().strip()
3482 self.assertEqual(peer_data_repr,
3483 repr(cb_data).encode("us-ascii"))
3484 s.close()
3485
3486 # now, again
3487 s = test_wrap_socket(socket.socket(),
3488 server_side=False,
3489 certfile=CERTFILE,
3490 ca_certs=CERTFILE,
3491 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003492 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003493 s.connect((HOST, server.port))
3494 new_cb_data = s.get_channel_binding("tls-unique")
3495 if support.verbose:
3496 sys.stdout.write(" got another channel binding data: {0!r}\n"
3497 .format(new_cb_data))
3498 # is it really unique
3499 self.assertNotEqual(cb_data, new_cb_data)
3500 self.assertIsNotNone(cb_data)
3501 self.assertEqual(len(cb_data), 12) # True for TLSv1
3502 s.write(b"CB tls-unique\n")
3503 peer_data_repr = s.read().strip()
3504 self.assertEqual(peer_data_repr,
3505 repr(new_cb_data).encode("us-ascii"))
3506 s.close()
3507
3508 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003509 client_context, server_context, hostname = testing_context()
3510 stats = server_params_test(client_context, server_context,
3511 chatty=True, connectionchatty=True,
3512 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003513 if support.verbose:
3514 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3515 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3516
3517 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3518 "ssl.OP_NO_COMPRESSION needed for this test")
3519 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003520 client_context, server_context, hostname = testing_context()
3521 client_context.options |= ssl.OP_NO_COMPRESSION
3522 server_context.options |= ssl.OP_NO_COMPRESSION
3523 stats = server_params_test(client_context, server_context,
3524 chatty=True, connectionchatty=True,
3525 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003526 self.assertIs(stats['compression'], None)
3527
3528 def test_dh_params(self):
3529 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003530 client_context, server_context, hostname = testing_context()
3531 server_context.load_dh_params(DHFILE)
3532 server_context.set_ciphers("kEDH")
3533 stats = server_params_test(client_context, server_context,
3534 chatty=True, connectionchatty=True,
3535 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003536 cipher = stats["cipher"][0]
3537 parts = cipher.split("-")
3538 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3539 self.fail("Non-DH cipher: " + cipher[0])
3540
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003541 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
3542 def test_ecdh_curve(self):
3543 # server secp384r1, client auto
3544 client_context, server_context, hostname = testing_context()
3545 server_context.set_ecdh_curve("secp384r1")
3546 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3547 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3548 stats = server_params_test(client_context, server_context,
3549 chatty=True, connectionchatty=True,
3550 sni_name=hostname)
3551
3552 # server auto, client secp384r1
3553 client_context, server_context, hostname = testing_context()
3554 client_context.set_ecdh_curve("secp384r1")
3555 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3556 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3557 stats = server_params_test(client_context, server_context,
3558 chatty=True, connectionchatty=True,
3559 sni_name=hostname)
3560
3561 # server / client curve mismatch
3562 client_context, server_context, hostname = testing_context()
3563 client_context.set_ecdh_curve("prime256v1")
3564 server_context.set_ecdh_curve("secp384r1")
3565 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3566 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3567 try:
3568 stats = server_params_test(client_context, server_context,
3569 chatty=True, connectionchatty=True,
3570 sni_name=hostname)
3571 except ssl.SSLError:
3572 pass
3573 else:
3574 # OpenSSL 1.0.2 does not fail although it should.
3575 if IS_OPENSSL_1_1:
3576 self.fail("mismatch curve did not fail")
3577
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003578 def test_selected_alpn_protocol(self):
3579 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003580 client_context, server_context, hostname = testing_context()
3581 stats = server_params_test(client_context, server_context,
3582 chatty=True, connectionchatty=True,
3583 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003584 self.assertIs(stats['client_alpn_protocol'], None)
3585
3586 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3587 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3588 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003589 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003590 server_context.set_alpn_protocols(['foo', 'bar'])
3591 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003592 chatty=True, connectionchatty=True,
3593 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003594 self.assertIs(stats['client_alpn_protocol'], None)
3595
3596 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3597 def test_alpn_protocols(self):
3598 server_protocols = ['foo', 'bar', 'milkshake']
3599 protocol_tests = [
3600 (['foo', 'bar'], 'foo'),
3601 (['bar', 'foo'], 'foo'),
3602 (['milkshake'], 'milkshake'),
3603 (['http/3.0', 'http/4.0'], None)
3604 ]
3605 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003606 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003607 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003608 client_context.set_alpn_protocols(client_protocols)
3609
3610 try:
3611 stats = server_params_test(client_context,
3612 server_context,
3613 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003614 connectionchatty=True,
3615 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003616 except ssl.SSLError as e:
3617 stats = e
3618
3619 if (expected is None and IS_OPENSSL_1_1
3620 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3621 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3622 self.assertIsInstance(stats, ssl.SSLError)
3623 else:
3624 msg = "failed trying %s (s) and %s (c).\n" \
3625 "was expecting %s, but got %%s from the %%s" \
3626 % (str(server_protocols), str(client_protocols),
3627 str(expected))
3628 client_result = stats['client_alpn_protocol']
3629 self.assertEqual(client_result, expected,
3630 msg % (client_result, "client"))
3631 server_result = stats['server_alpn_protocols'][-1] \
3632 if len(stats['server_alpn_protocols']) else 'nothing'
3633 self.assertEqual(server_result, expected,
3634 msg % (server_result, "server"))
3635
3636 def test_selected_npn_protocol(self):
3637 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003638 client_context, server_context, hostname = testing_context()
3639 stats = server_params_test(client_context, server_context,
3640 chatty=True, connectionchatty=True,
3641 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003642 self.assertIs(stats['client_npn_protocol'], None)
3643
3644 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3645 def test_npn_protocols(self):
3646 server_protocols = ['http/1.1', 'spdy/2']
3647 protocol_tests = [
3648 (['http/1.1', 'spdy/2'], 'http/1.1'),
3649 (['spdy/2', 'http/1.1'], 'http/1.1'),
3650 (['spdy/2', 'test'], 'spdy/2'),
3651 (['abc', 'def'], 'abc')
3652 ]
3653 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003654 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003655 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003656 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003657 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003658 chatty=True, connectionchatty=True,
3659 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003660 msg = "failed trying %s (s) and %s (c).\n" \
3661 "was expecting %s, but got %%s from the %%s" \
3662 % (str(server_protocols), str(client_protocols),
3663 str(expected))
3664 client_result = stats['client_npn_protocol']
3665 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3666 server_result = stats['server_npn_protocols'][-1] \
3667 if len(stats['server_npn_protocols']) else 'nothing'
3668 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3669
3670 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003671 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003672 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003673 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003674 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003675 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003676 client_context.load_verify_locations(SIGNING_CA)
3677 return server_context, other_context, client_context
3678
3679 def check_common_name(self, stats, name):
3680 cert = stats['peercert']
3681 self.assertIn((('commonName', name),), cert['subject'])
3682
3683 @needs_sni
3684 def test_sni_callback(self):
3685 calls = []
3686 server_context, other_context, client_context = self.sni_contexts()
3687
Christian Heimesa170fa12017-09-15 20:27:30 +02003688 client_context.check_hostname = False
3689
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003690 def servername_cb(ssl_sock, server_name, initial_context):
3691 calls.append((server_name, initial_context))
3692 if server_name is not None:
3693 ssl_sock.context = other_context
3694 server_context.set_servername_callback(servername_cb)
3695
3696 stats = server_params_test(client_context, server_context,
3697 chatty=True,
3698 sni_name='supermessage')
3699 # The hostname was fetched properly, and the certificate was
3700 # changed for the connection.
3701 self.assertEqual(calls, [("supermessage", server_context)])
3702 # CERTFILE4 was selected
3703 self.check_common_name(stats, 'fakehostname')
3704
3705 calls = []
3706 # The callback is called with server_name=None
3707 stats = server_params_test(client_context, server_context,
3708 chatty=True,
3709 sni_name=None)
3710 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003711 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003712
3713 # Check disabling the callback
3714 calls = []
3715 server_context.set_servername_callback(None)
3716
3717 stats = server_params_test(client_context, server_context,
3718 chatty=True,
3719 sni_name='notfunny')
3720 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003721 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003722 self.assertEqual(calls, [])
3723
3724 @needs_sni
3725 def test_sni_callback_alert(self):
3726 # Returning a TLS alert is reflected to the connecting client
3727 server_context, other_context, client_context = self.sni_contexts()
3728
3729 def cb_returning_alert(ssl_sock, server_name, initial_context):
3730 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3731 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003732 with self.assertRaises(ssl.SSLError) as cm:
3733 stats = server_params_test(client_context, server_context,
3734 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003735 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003736 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003737
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003738 @needs_sni
3739 def test_sni_callback_raising(self):
3740 # Raising fails the connection with a TLS handshake failure alert.
3741 server_context, other_context, client_context = self.sni_contexts()
3742
3743 def cb_raising(ssl_sock, server_name, initial_context):
3744 1/0
3745 server_context.set_servername_callback(cb_raising)
3746
3747 with self.assertRaises(ssl.SSLError) as cm, \
3748 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003749 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003750 chatty=False,
3751 sni_name='supermessage')
3752 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3753 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003754
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003755 @needs_sni
3756 def test_sni_callback_wrong_return_type(self):
3757 # Returning the wrong return type terminates the TLS connection
3758 # with an internal error alert.
3759 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003760
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3762 return "foo"
3763 server_context.set_servername_callback(cb_wrong_return_type)
3764
3765 with self.assertRaises(ssl.SSLError) as cm, \
3766 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003767 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003768 chatty=False,
3769 sni_name='supermessage')
3770 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3771 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003772
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003773 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003774 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003775 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3776 client_context.set_ciphers("AES128:AES256")
3777 server_context.set_ciphers("AES256")
3778 alg1 = "AES256"
3779 alg2 = "AES-256"
3780 else:
3781 client_context.set_ciphers("AES:3DES")
3782 server_context.set_ciphers("3DES")
3783 alg1 = "3DES"
3784 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003785
Christian Heimesa170fa12017-09-15 20:27:30 +02003786 stats = server_params_test(client_context, server_context,
3787 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003788 ciphers = stats['server_shared_ciphers'][0]
3789 self.assertGreater(len(ciphers), 0)
3790 for name, tls_version, bits in ciphers:
3791 if not alg1 in name.split("-") and alg2 not in name:
3792 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003793
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003794 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003795 client_context, server_context, hostname = testing_context()
3796 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003797
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003798 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003799 s = client_context.wrap_socket(socket.socket(),
3800 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003801 s.connect((HOST, server.port))
3802 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003803
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003804 self.assertRaises(ValueError, s.read, 1024)
3805 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003806
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003807 def test_sendfile(self):
3808 TEST_DATA = b"x" * 512
3809 with open(support.TESTFN, 'wb') as f:
3810 f.write(TEST_DATA)
3811 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003812 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003813 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003814 context.load_verify_locations(SIGNING_CA)
3815 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003816 server = ThreadedEchoServer(context=context, chatty=False)
3817 with server:
3818 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003819 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003820 with open(support.TESTFN, 'rb') as file:
3821 s.sendfile(file)
3822 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003823
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003824 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003825 client_context, server_context, hostname = testing_context()
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003826
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003827 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003828 stats = server_params_test(client_context, server_context,
3829 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003830 session = stats['session']
3831 self.assertTrue(session.id)
3832 self.assertGreater(session.time, 0)
3833 self.assertGreater(session.timeout, 0)
3834 self.assertTrue(session.has_ticket)
3835 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3836 self.assertGreater(session.ticket_lifetime_hint, 0)
3837 self.assertFalse(stats['session_reused'])
3838 sess_stat = server_context.session_stats()
3839 self.assertEqual(sess_stat['accept'], 1)
3840 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003841
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003842 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003843 stats = server_params_test(client_context, server_context,
3844 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003845 sess_stat = server_context.session_stats()
3846 self.assertEqual(sess_stat['accept'], 2)
3847 self.assertEqual(sess_stat['hits'], 1)
3848 self.assertTrue(stats['session_reused'])
3849 session2 = stats['session']
3850 self.assertEqual(session2.id, session.id)
3851 self.assertEqual(session2, session)
3852 self.assertIsNot(session2, session)
3853 self.assertGreaterEqual(session2.time, session.time)
3854 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003855
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003856 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003857 stats = server_params_test(client_context, server_context,
3858 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003859 self.assertFalse(stats['session_reused'])
3860 session3 = stats['session']
3861 self.assertNotEqual(session3.id, session.id)
3862 self.assertNotEqual(session3, session)
3863 sess_stat = server_context.session_stats()
3864 self.assertEqual(sess_stat['accept'], 3)
3865 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003866
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003867 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003868 stats = server_params_test(client_context, server_context,
3869 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003870 self.assertTrue(stats['session_reused'])
3871 session4 = stats['session']
3872 self.assertEqual(session4.id, session.id)
3873 self.assertEqual(session4, session)
3874 self.assertGreaterEqual(session4.time, session.time)
3875 self.assertGreaterEqual(session4.timeout, session.timeout)
3876 sess_stat = server_context.session_stats()
3877 self.assertEqual(sess_stat['accept'], 4)
3878 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003879
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003880 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003881 client_context, server_context, hostname = testing_context()
3882 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02003883
Christian Heimescb5b68a2017-09-07 18:07:00 -07003884 # TODO: session reuse does not work with TLS 1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02003885 client_context.options |= ssl.OP_NO_TLSv1_3
3886 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07003887
Christian Heimesa170fa12017-09-15 20:27:30 +02003888 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003889 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003890 with client_context.wrap_socket(socket.socket(),
3891 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003892 # session is None before handshake
3893 self.assertEqual(s.session, None)
3894 self.assertEqual(s.session_reused, None)
3895 s.connect((HOST, server.port))
3896 session = s.session
3897 self.assertTrue(session)
3898 with self.assertRaises(TypeError) as e:
3899 s.session = object
3900 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003901
Christian Heimesa170fa12017-09-15 20:27:30 +02003902 with client_context.wrap_socket(socket.socket(),
3903 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003904 s.connect((HOST, server.port))
3905 # cannot set session after handshake
3906 with self.assertRaises(ValueError) as e:
3907 s.session = session
3908 self.assertEqual(str(e.exception),
3909 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003910
Christian Heimesa170fa12017-09-15 20:27:30 +02003911 with client_context.wrap_socket(socket.socket(),
3912 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003913 # can set session before handshake and before the
3914 # connection was established
3915 s.session = session
3916 s.connect((HOST, server.port))
3917 self.assertEqual(s.session.id, session.id)
3918 self.assertEqual(s.session, session)
3919 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003920
Christian Heimesa170fa12017-09-15 20:27:30 +02003921 with client_context2.wrap_socket(socket.socket(),
3922 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003923 # cannot re-use session with a different SSLContext
3924 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003925 s.session = session
3926 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 self.assertEqual(str(e.exception),
3928 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003929
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003930
Thomas Woutersed03b412007-08-28 21:37:11 +00003931def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003932 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003933 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003934 plats = {
3935 'Linux': platform.linux_distribution,
3936 'Mac': platform.mac_ver,
3937 'Windows': platform.win32_ver,
3938 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003939 with warnings.catch_warnings():
3940 warnings.filterwarnings(
3941 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003942 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003943 'functions are deprecated .*',
3944 PendingDeprecationWarning,
3945 )
3946 for name, func in plats.items():
3947 plat = func()
3948 if plat and plat[0]:
3949 plat = '%s %r' % (name, plat)
3950 break
3951 else:
3952 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003953 print("test_ssl: testing with %r %r" %
3954 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3955 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003956 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003957 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3958 try:
3959 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3960 except AttributeError:
3961 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003962
Antoine Pitrou152efa22010-05-16 18:19:27 +00003963 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003964 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003965 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003966 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003967 BADCERT, BADKEY, EMPTYCERT]:
3968 if not os.path.exists(filename):
3969 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003970
Martin Panter3840b2a2016-03-27 01:53:46 +00003971 tests = [
3972 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003973 SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00003974 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003975
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003976 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003977 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003978
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003979 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00003980 try:
3981 support.run_unittest(*tests)
3982 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003983 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003984
3985if __name__ == "__main__":
3986 test_main()