blob: 73d3e3bbcdaeb8039029429d562aa81caab6d182 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
Christian Heimes05d9fe32018-02-27 08:55:39 +010058WRONG_CERT = data_file("wrongcert.pem")
Christian Heimesefff7062013-11-21 03:35:02 +010059
Christian Heimesbd5c7d22018-01-20 15:16:30 +010060CERTFILE_INFO = {
61 'issuer': ((('countryName', 'XY'),),
62 (('localityName', 'Castle Anthrax'),),
63 (('organizationName', 'Python Software Foundation'),),
64 (('commonName', 'localhost'),)),
65 'notAfter': 'Jan 17 19:09:06 2028 GMT',
66 'notBefore': 'Jan 19 19:09:06 2018 GMT',
67 'serialNumber': 'F9BA076D5B6ABD9B',
68 'subject': ((('countryName', 'XY'),),
69 (('localityName', 'Castle Anthrax'),),
70 (('organizationName', 'Python Software Foundation'),),
71 (('commonName', 'localhost'),)),
72 'subjectAltName': (('DNS', 'localhost'),),
73 'version': 3
74}
Antoine Pitrou152efa22010-05-16 18:19:27 +000075
Christian Heimes22587792013-11-21 23:56:13 +010076# empty CRL
77CRLFILE = data_file("revocation.crl")
78
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010079# Two keys and certs signed by the same CA (for SNI tests)
80SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020081SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082
83SIGNED_CERTFILE_INFO = {
84 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
85 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
86 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
87 'issuer': ((('countryName', 'XY'),),
88 (('organizationName', 'Python Software Foundation CA'),),
89 (('commonName', 'our-ca-server'),)),
90 'notAfter': 'Nov 28 19:09:06 2027 GMT',
91 'notBefore': 'Jan 19 19:09:06 2018 GMT',
92 'serialNumber': '82EDBF41C880919C',
93 'subject': ((('countryName', 'XY'),),
94 (('localityName', 'Castle Anthrax'),),
95 (('organizationName', 'Python Software Foundation'),),
96 (('commonName', 'localhost'),)),
97 'subjectAltName': (('DNS', 'localhost'),),
98 'version': 3
99}
100
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100101SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200102SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100103SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
104SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
105
Martin Panter3840b2a2016-03-27 01:53:46 +0000106# Same certificate as pycacert.pem, but without extra text in file
107SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200108# cert with all kinds of subject alt names
109ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100110IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100111
Martin Panter3d81d932016-01-14 09:36:00 +0000112REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000113
114EMPTYCERT = data_file("nullcert.pem")
115BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000116NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000117BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200118NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200119NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400121DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Christian Heimesb7b92252018-02-25 09:49:31 +0100149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100184needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
185
Antoine Pitrou23df4832010-08-04 17:14:06 +0000186
Christian Heimesd0486372016-09-10 23:23:33 +0200187def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
188 cert_reqs=ssl.CERT_NONE, ca_certs=None,
189 ciphers=None, certfile=None, keyfile=None,
190 **kwargs):
191 context = ssl.SSLContext(ssl_version)
192 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200193 if cert_reqs == ssl.CERT_NONE:
194 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200195 context.verify_mode = cert_reqs
196 if ca_certs is not None:
197 context.load_verify_locations(ca_certs)
198 if certfile is not None or keyfile is not None:
199 context.load_cert_chain(certfile, keyfile)
200 if ciphers is not None:
201 context.set_ciphers(ciphers)
202 return context.wrap_socket(sock, **kwargs)
203
Christian Heimesa170fa12017-09-15 20:27:30 +0200204
205def testing_context(server_cert=SIGNED_CERTFILE):
206 """Create context
207
208 client_context, server_context, hostname = testing_context()
209 """
210 if server_cert == SIGNED_CERTFILE:
211 hostname = SIGNED_CERTFILE_HOSTNAME
212 elif server_cert == SIGNED_CERTFILE2:
213 hostname = SIGNED_CERTFILE2_HOSTNAME
214 else:
215 raise ValueError(server_cert)
216
217 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
218 client_context.load_verify_locations(SIGNING_CA)
219
220 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
221 server_context.load_cert_chain(server_cert)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100222 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200223
224 return client_context, server_context, hostname
225
226
Antoine Pitrou152efa22010-05-16 18:19:27 +0000227class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000228
Antoine Pitrou480a1242010-04-28 21:37:09 +0000229 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000230 ssl.CERT_NONE
231 ssl.CERT_OPTIONAL
232 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100233 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100234 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100235 if ssl.HAS_ECDH:
236 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100237 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
238 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000239 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100240 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700241 ssl.OP_NO_SSLv2
242 ssl.OP_NO_SSLv3
243 ssl.OP_NO_TLSv1
244 ssl.OP_NO_TLSv1_3
245 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
246 ssl.OP_NO_TLSv1_1
247 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200248 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000249
Christian Heimes9d50ab52018-02-27 10:17:30 +0100250 def test_private_init(self):
251 with self.assertRaisesRegex(TypeError, "public constructor"):
252 with socket.socket() as s:
253 ssl.SSLSocket(s)
254
Antoine Pitrou172f0252014-04-18 20:33:08 +0200255 def test_str_for_enums(self):
256 # Make sure that the PROTOCOL_* constants have enum-like string
257 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200258 proto = ssl.PROTOCOL_TLS
259 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200260 ctx = ssl.SSLContext(proto)
261 self.assertIs(ctx.protocol, proto)
262
Antoine Pitrou480a1242010-04-28 21:37:09 +0000263 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000264 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000265 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000266 sys.stdout.write("\n RAND_status is %d (%s)\n"
267 % (v, (v and "sufficient randomness") or
268 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200269
270 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
271 self.assertEqual(len(data), 16)
272 self.assertEqual(is_cryptographic, v == 1)
273 if v:
274 data = ssl.RAND_bytes(16)
275 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200276 else:
277 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200278
Victor Stinner1e81a392013-12-19 16:47:04 +0100279 # negative num is invalid
280 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
281 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
282
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100283 if hasattr(ssl, 'RAND_egd'):
284 self.assertRaises(TypeError, ssl.RAND_egd, 1)
285 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000286 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200287 ssl.RAND_add(b"this is a random bytes object", 75.0)
288 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000289
Christian Heimesf77b4b22013-08-21 13:26:05 +0200290 @unittest.skipUnless(os.name == 'posix', 'requires posix')
291 def test_random_fork(self):
292 status = ssl.RAND_status()
293 if not status:
294 self.fail("OpenSSL's PRNG has insufficient randomness")
295
296 rfd, wfd = os.pipe()
297 pid = os.fork()
298 if pid == 0:
299 try:
300 os.close(rfd)
301 child_random = ssl.RAND_pseudo_bytes(16)[0]
302 self.assertEqual(len(child_random), 16)
303 os.write(wfd, child_random)
304 os.close(wfd)
305 except BaseException:
306 os._exit(1)
307 else:
308 os._exit(0)
309 else:
310 os.close(wfd)
311 self.addCleanup(os.close, rfd)
312 _, status = os.waitpid(pid, 0)
313 self.assertEqual(status, 0)
314
315 child_random = os.read(rfd, 16)
316 self.assertEqual(len(child_random), 16)
317 parent_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(parent_random), 16)
319
320 self.assertNotEqual(child_random, parent_random)
321
Antoine Pitrou480a1242010-04-28 21:37:09 +0000322 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000323 # note that this uses an 'unofficial' function in _ssl.c,
324 # provided solely for this test, to exercise the certificate
325 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100326 self.assertEqual(
327 ssl._ssl._test_decode_cert(CERTFILE),
328 CERTFILE_INFO
329 )
330 self.assertEqual(
331 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
332 SIGNED_CERTFILE_INFO
333 )
334
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200335 # Issue #13034: the subjectAltName in some certificates
336 # (notably projects.developer.nokia.com:443) wasn't parsed
337 p = ssl._ssl._test_decode_cert(NOKIACERT)
338 if support.verbose:
339 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
340 self.assertEqual(p['subjectAltName'],
341 (('DNS', 'projects.developer.nokia.com'),
342 ('DNS', 'projects.forum.nokia.com'))
343 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100344 # extra OCSP and AIA fields
345 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
346 self.assertEqual(p['caIssuers'],
347 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
348 self.assertEqual(p['crlDistributionPoints'],
349 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000350
Christian Heimes824f7f32013-08-17 00:54:47 +0200351 def test_parse_cert_CVE_2013_4238(self):
352 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
353 if support.verbose:
354 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
355 subject = ((('countryName', 'US'),),
356 (('stateOrProvinceName', 'Oregon'),),
357 (('localityName', 'Beaverton'),),
358 (('organizationName', 'Python Software Foundation'),),
359 (('organizationalUnitName', 'Python Core Development'),),
360 (('commonName', 'null.python.org\x00example.org'),),
361 (('emailAddress', 'python-dev@python.org'),))
362 self.assertEqual(p['subject'], subject)
363 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200364 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
365 san = (('DNS', 'altnull.python.org\x00example.com'),
366 ('email', 'null@python.org\x00user@example.org'),
367 ('URI', 'http://null.python.org\x00http://example.org'),
368 ('IP Address', '192.0.2.1'),
369 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
370 else:
371 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
372 san = (('DNS', 'altnull.python.org\x00example.com'),
373 ('email', 'null@python.org\x00user@example.org'),
374 ('URI', 'http://null.python.org\x00http://example.org'),
375 ('IP Address', '192.0.2.1'),
376 ('IP Address', '<invalid>'))
377
378 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200379
Christian Heimes1c03abd2016-09-06 23:25:35 +0200380 def test_parse_all_sans(self):
381 p = ssl._ssl._test_decode_cert(ALLSANFILE)
382 self.assertEqual(p['subjectAltName'],
383 (
384 ('DNS', 'allsans'),
385 ('othername', '<unsupported>'),
386 ('othername', '<unsupported>'),
387 ('email', 'user@example.org'),
388 ('DNS', 'www.example.org'),
389 ('DirName',
390 ((('countryName', 'XY'),),
391 (('localityName', 'Castle Anthrax'),),
392 (('organizationName', 'Python Software Foundation'),),
393 (('commonName', 'dirname example'),))),
394 ('URI', 'https://www.python.org/'),
395 ('IP Address', '127.0.0.1'),
396 ('IP Address', '0:0:0:0:0:0:0:1\n'),
397 ('Registered ID', '1.2.3.4.5')
398 )
399 )
400
Antoine Pitrou480a1242010-04-28 21:37:09 +0000401 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000402 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000403 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000404 d1 = ssl.PEM_cert_to_DER_cert(pem)
405 p2 = ssl.DER_cert_to_PEM_cert(d1)
406 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000407 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000408 if not p2.startswith(ssl.PEM_HEADER + '\n'):
409 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
410 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
411 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000412
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000413 def test_openssl_version(self):
414 n = ssl.OPENSSL_VERSION_NUMBER
415 t = ssl.OPENSSL_VERSION_INFO
416 s = ssl.OPENSSL_VERSION
417 self.assertIsInstance(n, int)
418 self.assertIsInstance(t, tuple)
419 self.assertIsInstance(s, str)
420 # Some sanity checks follow
421 # >= 0.9
422 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400423 # < 3.0
424 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000425 major, minor, fix, patch, status = t
426 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400427 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000428 self.assertGreaterEqual(minor, 0)
429 self.assertLess(minor, 256)
430 self.assertGreaterEqual(fix, 0)
431 self.assertLess(fix, 256)
432 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100433 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000434 self.assertGreaterEqual(status, 0)
435 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400436 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200437 if IS_LIBRESSL:
438 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100439 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400440 else:
441 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100442 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000443
Antoine Pitrou9d543662010-04-23 23:10:32 +0000444 @support.cpython_only
445 def test_refcycle(self):
446 # Issue #7943: an SSL object doesn't create reference cycles with
447 # itself.
448 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200449 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000450 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100451 with support.check_warnings(("", ResourceWarning)):
452 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100453 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000454
Antoine Pitroua468adc2010-09-14 14:43:44 +0000455 def test_wrapped_unconnected(self):
456 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200457 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000458 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200459 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100460 self.assertRaises(OSError, ss.recv, 1)
461 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
462 self.assertRaises(OSError, ss.recvfrom, 1)
463 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
464 self.assertRaises(OSError, ss.send, b'x')
465 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Christian Heimes141c5e82018-02-24 21:10:57 +0100466 self.assertRaises(NotImplementedError, ss.sendmsg,
467 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000468
Antoine Pitrou40f08742010-04-24 22:04:40 +0000469 def test_timeout(self):
470 # Issue #8524: when creating an SSL socket, the timeout of the
471 # original socket should be retained.
472 for timeout in (None, 0.0, 5.0):
473 s = socket.socket(socket.AF_INET)
474 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200475 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100476 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000477
Christian Heimesd0486372016-09-10 23:23:33 +0200478 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000479 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000480 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000481 "certfile must be specified",
482 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000483 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000484 "certfile must be specified for server-side operations",
485 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000486 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000487 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200488 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100489 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
490 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200491 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200492 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000493 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000494 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000495 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200496 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000497 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000498 ssl.wrap_socket(sock,
499 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000500 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200501 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000502 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000503 ssl.wrap_socket(sock,
504 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000505 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000506
Martin Panter3464ea22016-02-01 21:58:11 +0000507 def bad_cert_test(self, certfile):
508 """Check that trying to use the given client certificate fails"""
509 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
510 certfile)
511 sock = socket.socket()
512 self.addCleanup(sock.close)
513 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200514 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200515 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000516
517 def test_empty_cert(self):
518 """Wrapping with an empty cert file"""
519 self.bad_cert_test("nullcert.pem")
520
521 def test_malformed_cert(self):
522 """Wrapping with a badly formatted certificate (syntax error)"""
523 self.bad_cert_test("badcert.pem")
524
525 def test_malformed_key(self):
526 """Wrapping with a badly formatted key (syntax error)"""
527 self.bad_cert_test("badkey.pem")
528
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000529 def test_match_hostname(self):
530 def ok(cert, hostname):
531 ssl.match_hostname(cert, hostname)
532 def fail(cert, hostname):
533 self.assertRaises(ssl.CertificateError,
534 ssl.match_hostname, cert, hostname)
535
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100536 # -- Hostname matching --
537
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000538 cert = {'subject': ((('commonName', 'example.com'),),)}
539 ok(cert, 'example.com')
540 ok(cert, 'ExAmple.cOm')
541 fail(cert, 'www.example.com')
542 fail(cert, '.example.com')
543 fail(cert, 'example.org')
544 fail(cert, 'exampleXcom')
545
546 cert = {'subject': ((('commonName', '*.a.com'),),)}
547 ok(cert, 'foo.a.com')
548 fail(cert, 'bar.foo.a.com')
549 fail(cert, 'a.com')
550 fail(cert, 'Xa.com')
551 fail(cert, '.a.com')
552
Mandeep Singhede2ac92017-11-27 04:01:27 +0530553 # only match wildcards when they are the only thing
554 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530556 fail(cert, 'foo.com')
557 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000558 fail(cert, 'bar.com')
559 fail(cert, 'foo.a.com')
560 fail(cert, 'bar.foo.com')
561
Christian Heimes824f7f32013-08-17 00:54:47 +0200562 # NULL bytes are bad, CVE-2013-4073
563 cert = {'subject': ((('commonName',
564 'null.python.org\x00example.org'),),)}
565 ok(cert, 'null.python.org\x00example.org') # or raise an error?
566 fail(cert, 'example.org')
567 fail(cert, 'null.python.org')
568
Georg Brandl72c98d32013-10-27 07:16:53 +0100569 # error cases with wildcards
570 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
571 fail(cert, 'bar.foo.a.com')
572 fail(cert, 'a.com')
573 fail(cert, 'Xa.com')
574 fail(cert, '.a.com')
575
576 cert = {'subject': ((('commonName', 'a.*.com'),),)}
577 fail(cert, 'a.foo.com')
578 fail(cert, 'a..com')
579 fail(cert, 'a.com')
580
581 # wildcard doesn't match IDNA prefix 'xn--'
582 idna = 'püthon.python.org'.encode("idna").decode("ascii")
583 cert = {'subject': ((('commonName', idna),),)}
584 ok(cert, idna)
585 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
586 fail(cert, idna)
587 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
588 fail(cert, idna)
589
590 # wildcard in first fragment and IDNA A-labels in sequent fragments
591 # are supported.
592 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
593 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530594 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
595 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100596 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
597 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
598
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 # Slightly fake real-world example
600 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
601 'subject': ((('commonName', 'linuxfrz.org'),),),
602 'subjectAltName': (('DNS', 'linuxfr.org'),
603 ('DNS', 'linuxfr.com'),
604 ('othername', '<unsupported>'))}
605 ok(cert, 'linuxfr.org')
606 ok(cert, 'linuxfr.com')
607 # Not a "DNS" entry
608 fail(cert, '<unsupported>')
609 # When there is a subjectAltName, commonName isn't used
610 fail(cert, 'linuxfrz.org')
611
612 # A pristine real-world example
613 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
614 'subject': ((('countryName', 'US'),),
615 (('stateOrProvinceName', 'California'),),
616 (('localityName', 'Mountain View'),),
617 (('organizationName', 'Google Inc'),),
618 (('commonName', 'mail.google.com'),))}
619 ok(cert, 'mail.google.com')
620 fail(cert, 'gmail.com')
621 # Only commonName is considered
622 fail(cert, 'California')
623
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100624 # -- IPv4 matching --
625 cert = {'subject': ((('commonName', 'example.com'),),),
626 'subjectAltName': (('DNS', 'example.com'),
627 ('IP Address', '10.11.12.13'),
628 ('IP Address', '14.15.16.17'))}
629 ok(cert, '10.11.12.13')
630 ok(cert, '14.15.16.17')
631 fail(cert, '14.15.16.18')
632 fail(cert, 'example.net')
633
634 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100635 if hasattr(socket, 'AF_INET6'):
636 cert = {'subject': ((('commonName', 'example.com'),),),
637 'subjectAltName': (
638 ('DNS', 'example.com'),
639 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
640 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
641 ok(cert, '2001::cafe')
642 ok(cert, '2003::baba')
643 fail(cert, '2003::bebe')
644 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100645
646 # -- Miscellaneous --
647
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000648 # Neither commonName nor subjectAltName
649 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
650 'subject': ((('countryName', 'US'),),
651 (('stateOrProvinceName', 'California'),),
652 (('localityName', 'Mountain View'),),
653 (('organizationName', 'Google Inc'),))}
654 fail(cert, 'mail.google.com')
655
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200656 # No DNS entry in subjectAltName but a commonName
657 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
658 'subject': ((('countryName', 'US'),),
659 (('stateOrProvinceName', 'California'),),
660 (('localityName', 'Mountain View'),),
661 (('commonName', 'mail.google.com'),)),
662 'subjectAltName': (('othername', 'blabla'), )}
663 ok(cert, 'mail.google.com')
664
665 # No DNS entry subjectAltName and no commonName
666 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
667 'subject': ((('countryName', 'US'),),
668 (('stateOrProvinceName', 'California'),),
669 (('localityName', 'Mountain View'),),
670 (('organizationName', 'Google Inc'),)),
671 'subjectAltName': (('othername', 'blabla'),)}
672 fail(cert, 'google.com')
673
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000674 # Empty cert / no cert
675 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
676 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
677
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200678 # Issue #17980: avoid denials of service by refusing more than one
679 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100680 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
681 with self.assertRaisesRegex(
682 ssl.CertificateError,
683 "partial wildcards in leftmost label are not supported"):
684 ssl.match_hostname(cert, 'axxb.example.com')
685
686 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
687 with self.assertRaisesRegex(
688 ssl.CertificateError,
689 "wildcard can only be present in the leftmost label"):
690 ssl.match_hostname(cert, 'www.sub.example.com')
691
692 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
693 with self.assertRaisesRegex(
694 ssl.CertificateError,
695 "too many wildcards"):
696 ssl.match_hostname(cert, 'axxbxxc.example.com')
697
698 cert = {'subject': ((('commonName', '*'),),)}
699 with self.assertRaisesRegex(
700 ssl.CertificateError,
701 "sole wildcard without additional labels are not support"):
702 ssl.match_hostname(cert, 'host')
703
704 cert = {'subject': ((('commonName', '*.com'),),)}
705 with self.assertRaisesRegex(
706 ssl.CertificateError,
707 r"hostname 'com' doesn't match '\*.com'"):
708 ssl.match_hostname(cert, 'com')
709
710 # extra checks for _inet_paton()
711 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
712 with self.assertRaises(ValueError):
713 ssl._inet_paton(invalid)
714 for ipaddr in ['127.0.0.1', '192.168.0.1']:
715 self.assertTrue(ssl._inet_paton(ipaddr))
716 if hasattr(socket, 'AF_INET6'):
717 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
718 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200719
Antoine Pitroud5323212010-10-22 18:19:07 +0000720 def test_server_side(self):
721 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200722 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000723 with socket.socket() as sock:
724 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
725 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000726
Antoine Pitroud6494802011-07-21 01:11:30 +0200727 def test_unknown_channel_binding(self):
728 # should raise ValueError for unknown type
729 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200730 s.bind(('127.0.0.1', 0))
731 s.listen()
732 c = socket.socket(socket.AF_INET)
733 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200734 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100735 with self.assertRaises(ValueError):
736 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200737 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200738
739 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
740 "'tls-unique' channel binding not available")
741 def test_tls_unique_channel_binding(self):
742 # unconnected should return None for known type
743 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200744 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100745 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200746 # the same for server-side
747 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200748 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100749 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200750
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600751 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200752 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600753 r = repr(ss)
754 with self.assertWarns(ResourceWarning) as cm:
755 ss = None
756 support.gc_collect()
757 self.assertIn(r, str(cm.warning.args[0]))
758
Christian Heimes6d7ad132013-06-09 18:02:55 +0200759 def test_get_default_verify_paths(self):
760 paths = ssl.get_default_verify_paths()
761 self.assertEqual(len(paths), 6)
762 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
763
764 with support.EnvironmentVarGuard() as env:
765 env["SSL_CERT_DIR"] = CAPATH
766 env["SSL_CERT_FILE"] = CERTFILE
767 paths = ssl.get_default_verify_paths()
768 self.assertEqual(paths.cafile, CERTFILE)
769 self.assertEqual(paths.capath, CAPATH)
770
Christian Heimes44109d72013-11-22 01:51:30 +0100771 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
772 def test_enum_certificates(self):
773 self.assertTrue(ssl.enum_certificates("CA"))
774 self.assertTrue(ssl.enum_certificates("ROOT"))
775
776 self.assertRaises(TypeError, ssl.enum_certificates)
777 self.assertRaises(WindowsError, ssl.enum_certificates, "")
778
Christian Heimesc2d65e12013-11-22 16:13:55 +0100779 trust_oids = set()
780 for storename in ("CA", "ROOT"):
781 store = ssl.enum_certificates(storename)
782 self.assertIsInstance(store, list)
783 for element in store:
784 self.assertIsInstance(element, tuple)
785 self.assertEqual(len(element), 3)
786 cert, enc, trust = element
787 self.assertIsInstance(cert, bytes)
788 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
789 self.assertIsInstance(trust, (set, bool))
790 if isinstance(trust, set):
791 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100792
793 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100794 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200795
Christian Heimes46bebee2013-06-09 19:03:31 +0200796 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100797 def test_enum_crls(self):
798 self.assertTrue(ssl.enum_crls("CA"))
799 self.assertRaises(TypeError, ssl.enum_crls)
800 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200801
Christian Heimes44109d72013-11-22 01:51:30 +0100802 crls = ssl.enum_crls("CA")
803 self.assertIsInstance(crls, list)
804 for element in crls:
805 self.assertIsInstance(element, tuple)
806 self.assertEqual(len(element), 2)
807 self.assertIsInstance(element[0], bytes)
808 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200809
Christian Heimes46bebee2013-06-09 19:03:31 +0200810
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100811 def test_asn1object(self):
812 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
813 '1.3.6.1.5.5.7.3.1')
814
815 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
816 self.assertEqual(val, expected)
817 self.assertEqual(val.nid, 129)
818 self.assertEqual(val.shortname, 'serverAuth')
819 self.assertEqual(val.longname, 'TLS Web Server Authentication')
820 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
821 self.assertIsInstance(val, ssl._ASN1Object)
822 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
823
824 val = ssl._ASN1Object.fromnid(129)
825 self.assertEqual(val, expected)
826 self.assertIsInstance(val, ssl._ASN1Object)
827 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100828 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
829 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100830 for i in range(1000):
831 try:
832 obj = ssl._ASN1Object.fromnid(i)
833 except ValueError:
834 pass
835 else:
836 self.assertIsInstance(obj.nid, int)
837 self.assertIsInstance(obj.shortname, str)
838 self.assertIsInstance(obj.longname, str)
839 self.assertIsInstance(obj.oid, (str, type(None)))
840
841 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
842 self.assertEqual(val, expected)
843 self.assertIsInstance(val, ssl._ASN1Object)
844 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
845 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
846 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100847 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
848 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100849
Christian Heimes72d28502013-11-23 13:56:58 +0100850 def test_purpose_enum(self):
851 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
852 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
853 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
854 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
855 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
856 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
857 '1.3.6.1.5.5.7.3.1')
858
859 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
860 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
861 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
862 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
863 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
864 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
865 '1.3.6.1.5.5.7.3.2')
866
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100867 def test_unsupported_dtls(self):
868 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
869 self.addCleanup(s.close)
870 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200871 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100872 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200873 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100874 with self.assertRaises(NotImplementedError) as cx:
875 ctx.wrap_socket(s)
876 self.assertEqual(str(cx.exception), "only stream sockets are supported")
877
Antoine Pitrouc695c952014-04-28 20:57:36 +0200878 def cert_time_ok(self, timestring, timestamp):
879 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
880
881 def cert_time_fail(self, timestring):
882 with self.assertRaises(ValueError):
883 ssl.cert_time_to_seconds(timestring)
884
885 @unittest.skipUnless(utc_offset(),
886 'local time needs to be different from UTC')
887 def test_cert_time_to_seconds_timezone(self):
888 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
889 # results if local timezone is not UTC
890 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
891 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
892
893 def test_cert_time_to_seconds(self):
894 timestring = "Jan 5 09:34:43 2018 GMT"
895 ts = 1515144883.0
896 self.cert_time_ok(timestring, ts)
897 # accept keyword parameter, assert its name
898 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
899 # accept both %e and %d (space or zero generated by strftime)
900 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
901 # case-insensitive
902 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
903 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
904 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
905 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
906 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
907 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
908 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
909 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
910
911 newyear_ts = 1230768000.0
912 # leap seconds
913 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
914 # same timestamp
915 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
916
917 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
918 # allow 60th second (even if it is not a leap second)
919 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
920 # allow 2nd leap second for compatibility with time.strptime()
921 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
922 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
923
Mike53f7a7c2017-12-14 14:04:53 +0300924 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200925 # 99991231235959Z (rfc 5280)
926 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
927
928 @support.run_with_locale('LC_ALL', '')
929 def test_cert_time_to_seconds_locale(self):
930 # `cert_time_to_seconds()` should be locale independent
931
932 def local_february_name():
933 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
934
935 if local_february_name().lower() == 'feb':
936 self.skipTest("locale-specific month name needs to be "
937 "different from C locale")
938
939 # locale-independent
940 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
941 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
942
Martin Panter3840b2a2016-03-27 01:53:46 +0000943 def test_connect_ex_error(self):
944 server = socket.socket(socket.AF_INET)
945 self.addCleanup(server.close)
946 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200947 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000948 cert_reqs=ssl.CERT_REQUIRED)
949 self.addCleanup(s.close)
950 rc = s.connect_ex((HOST, port))
951 # Issue #19919: Windows machines or VMs hosted on Windows
952 # machines sometimes return EWOULDBLOCK.
953 errors = (
954 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
955 errno.EWOULDBLOCK,
956 )
957 self.assertIn(rc, errors)
958
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100959
Antoine Pitrou152efa22010-05-16 18:19:27 +0000960class ContextTests(unittest.TestCase):
961
962 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100963 for protocol in PROTOCOLS:
964 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200965 ctx = ssl.SSLContext()
966 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000967 self.assertRaises(ValueError, ssl.SSLContext, -1)
968 self.assertRaises(ValueError, ssl.SSLContext, 42)
969
970 def test_protocol(self):
971 for proto in PROTOCOLS:
972 ctx = ssl.SSLContext(proto)
973 self.assertEqual(ctx.protocol, proto)
974
975 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200976 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000977 ctx.set_ciphers("ALL")
978 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000979 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000980 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000981
Christian Heimes892d66e2018-01-29 14:10:18 +0100982 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
983 "Test applies only to Python default ciphers")
984 def test_python_ciphers(self):
985 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
986 ciphers = ctx.get_ciphers()
987 for suite in ciphers:
988 name = suite['name']
989 self.assertNotIn("PSK", name)
990 self.assertNotIn("SRP", name)
991 self.assertNotIn("MD5", name)
992 self.assertNotIn("RC4", name)
993 self.assertNotIn("3DES", name)
994
Christian Heimes25bfcd52016-09-06 00:04:45 +0200995 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
996 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200997 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200998 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200999 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001000 self.assertIn('AES256-GCM-SHA384', names)
1001 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001002
Antoine Pitroub5218772010-05-21 09:56:06 +00001003 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001005 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001006 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001007 # SSLContext also enables these by default
1008 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001009 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1010 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001011 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001012 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001013 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001014 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001015 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1016 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001017 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001018 # Ubuntu has OP_NO_SSLv3 forced on by default
1019 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001020 else:
1021 with self.assertRaises(ValueError):
1022 ctx.options = 0
1023
Christian Heimesa170fa12017-09-15 20:27:30 +02001024 def test_verify_mode_protocol(self):
1025 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001026 # Default value
1027 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1028 ctx.verify_mode = ssl.CERT_OPTIONAL
1029 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1030 ctx.verify_mode = ssl.CERT_REQUIRED
1031 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1032 ctx.verify_mode = ssl.CERT_NONE
1033 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1034 with self.assertRaises(TypeError):
1035 ctx.verify_mode = None
1036 with self.assertRaises(ValueError):
1037 ctx.verify_mode = 42
1038
Christian Heimesa170fa12017-09-15 20:27:30 +02001039 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1040 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1041 self.assertFalse(ctx.check_hostname)
1042
1043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1044 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1045 self.assertTrue(ctx.check_hostname)
1046
Christian Heimes61d478c2018-01-27 15:51:38 +01001047 def test_hostname_checks_common_name(self):
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1049 self.assertTrue(ctx.hostname_checks_common_name)
1050 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1051 ctx.hostname_checks_common_name = True
1052 self.assertTrue(ctx.hostname_checks_common_name)
1053 ctx.hostname_checks_common_name = False
1054 self.assertFalse(ctx.hostname_checks_common_name)
1055 ctx.hostname_checks_common_name = True
1056 self.assertTrue(ctx.hostname_checks_common_name)
1057 else:
1058 with self.assertRaises(AttributeError):
1059 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001060
Christian Heimes698dde12018-02-27 11:54:43 +01001061 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1062 "required OpenSSL 1.1.0g")
1063 def test_min_max_version(self):
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1065 self.assertEqual(
1066 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1067 )
1068 self.assertEqual(
1069 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1070 )
1071
1072 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1073 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1074 self.assertEqual(
1075 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1076 )
1077 self.assertEqual(
1078 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1079 )
1080
1081 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1082 ctx.maximum_version = ssl.TLSVersion.TLSv1
1083 self.assertEqual(
1084 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1085 )
1086 self.assertEqual(
1087 ctx.maximum_version, ssl.TLSVersion.TLSv1
1088 )
1089
1090 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1091 self.assertEqual(
1092 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1093 )
1094
1095 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1096 self.assertIn(
1097 ctx.maximum_version,
1098 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1099 )
1100
1101 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1102 self.assertIn(
1103 ctx.minimum_version,
1104 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1105 )
1106
1107 with self.assertRaises(ValueError):
1108 ctx.minimum_version = 42
1109
1110 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1111
1112 self.assertEqual(
1113 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1114 )
1115 self.assertEqual(
1116 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1117 )
1118 with self.assertRaises(ValueError):
1119 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1120 with self.assertRaises(ValueError):
1121 ctx.maximum_version = ssl.TLSVersion.TLSv1
1122
1123
Christian Heimes2427b502013-11-23 11:24:32 +01001124 @unittest.skipUnless(have_verify_flags(),
1125 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001126 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001127 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001128 # default value
1129 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1130 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001131 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1132 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1133 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1134 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1135 ctx.verify_flags = ssl.VERIFY_DEFAULT
1136 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1137 # supports any value
1138 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1139 self.assertEqual(ctx.verify_flags,
1140 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1141 with self.assertRaises(TypeError):
1142 ctx.verify_flags = None
1143
Antoine Pitrou152efa22010-05-16 18:19:27 +00001144 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001145 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001146 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001147 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001148 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1149 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001150 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001151 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001152 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001153 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001154 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001155 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001156 ctx.load_cert_chain(EMPTYCERT)
1157 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001158 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001159 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1160 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1161 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001162 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001163 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001164 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001165 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001166 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001167 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1168 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001170 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001171 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001172 # Password protected key and cert
1173 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1174 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1175 ctx.load_cert_chain(CERTFILE_PROTECTED,
1176 password=bytearray(KEY_PASSWORD.encode()))
1177 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1178 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1179 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1180 bytearray(KEY_PASSWORD.encode()))
1181 with self.assertRaisesRegex(TypeError, "should be a string"):
1182 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1183 with self.assertRaises(ssl.SSLError):
1184 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1185 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1186 # openssl has a fixed limit on the password buffer.
1187 # PEM_BUFSIZE is generally set to 1kb.
1188 # Return a string larger than this.
1189 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1190 # Password callback
1191 def getpass_unicode():
1192 return KEY_PASSWORD
1193 def getpass_bytes():
1194 return KEY_PASSWORD.encode()
1195 def getpass_bytearray():
1196 return bytearray(KEY_PASSWORD.encode())
1197 def getpass_badpass():
1198 return "badpass"
1199 def getpass_huge():
1200 return b'a' * (1024 * 1024)
1201 def getpass_bad_type():
1202 return 9
1203 def getpass_exception():
1204 raise Exception('getpass error')
1205 class GetPassCallable:
1206 def __call__(self):
1207 return KEY_PASSWORD
1208 def getpass(self):
1209 return KEY_PASSWORD
1210 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1211 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1212 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1213 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1214 ctx.load_cert_chain(CERTFILE_PROTECTED,
1215 password=GetPassCallable().getpass)
1216 with self.assertRaises(ssl.SSLError):
1217 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1218 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1219 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1220 with self.assertRaisesRegex(TypeError, "must return a string"):
1221 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1222 with self.assertRaisesRegex(Exception, "getpass error"):
1223 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1224 # Make sure the password function isn't called if it isn't needed
1225 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001226
1227 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001228 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001229 ctx.load_verify_locations(CERTFILE)
1230 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1231 ctx.load_verify_locations(BYTES_CERTFILE)
1232 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1233 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001234 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001235 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001236 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001237 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001238 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001239 ctx.load_verify_locations(BADCERT)
1240 ctx.load_verify_locations(CERTFILE, CAPATH)
1241 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1242
Victor Stinner80f75e62011-01-29 11:31:20 +00001243 # Issue #10989: crash if the second argument type is invalid
1244 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1245
Christian Heimesefff7062013-11-21 03:35:02 +01001246 def test_load_verify_cadata(self):
1247 # test cadata
1248 with open(CAFILE_CACERT) as f:
1249 cacert_pem = f.read()
1250 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1251 with open(CAFILE_NEURONIO) as f:
1252 neuronio_pem = f.read()
1253 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1254
1255 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001256 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001257 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1258 ctx.load_verify_locations(cadata=cacert_pem)
1259 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1260 ctx.load_verify_locations(cadata=neuronio_pem)
1261 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1262 # cert already in hash table
1263 ctx.load_verify_locations(cadata=neuronio_pem)
1264 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1265
1266 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001267 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001268 combined = "\n".join((cacert_pem, neuronio_pem))
1269 ctx.load_verify_locations(cadata=combined)
1270 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1271
1272 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001273 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001274 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1275 neuronio_pem, "tail"]
1276 ctx.load_verify_locations(cadata="\n".join(combined))
1277 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1278
1279 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001280 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001281 ctx.load_verify_locations(cadata=cacert_der)
1282 ctx.load_verify_locations(cadata=neuronio_der)
1283 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1284 # cert already in hash table
1285 ctx.load_verify_locations(cadata=cacert_der)
1286 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1287
1288 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001289 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001290 combined = b"".join((cacert_der, neuronio_der))
1291 ctx.load_verify_locations(cadata=combined)
1292 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1293
1294 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001295 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001296 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1297
1298 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1299 ctx.load_verify_locations(cadata="broken")
1300 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1301 ctx.load_verify_locations(cadata=b"broken")
1302
1303
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001304 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001306 ctx.load_dh_params(DHFILE)
1307 if os.name != 'nt':
1308 ctx.load_dh_params(BYTES_DHFILE)
1309 self.assertRaises(TypeError, ctx.load_dh_params)
1310 self.assertRaises(TypeError, ctx.load_dh_params, None)
1311 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001312 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001313 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001314 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001315 ctx.load_dh_params(CERTFILE)
1316
Antoine Pitroub0182c82010-10-12 20:09:02 +00001317 def test_session_stats(self):
1318 for proto in PROTOCOLS:
1319 ctx = ssl.SSLContext(proto)
1320 self.assertEqual(ctx.session_stats(), {
1321 'number': 0,
1322 'connect': 0,
1323 'connect_good': 0,
1324 'connect_renegotiate': 0,
1325 'accept': 0,
1326 'accept_good': 0,
1327 'accept_renegotiate': 0,
1328 'hits': 0,
1329 'misses': 0,
1330 'timeouts': 0,
1331 'cache_full': 0,
1332 })
1333
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001334 def test_set_default_verify_paths(self):
1335 # There's not much we can do to test that it acts as expected,
1336 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001337 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001338 ctx.set_default_verify_paths()
1339
Antoine Pitrou501da612011-12-21 09:27:41 +01001340 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001341 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001342 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001343 ctx.set_ecdh_curve("prime256v1")
1344 ctx.set_ecdh_curve(b"prime256v1")
1345 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1346 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1347 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1348 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1349
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001350 @needs_sni
1351 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001352 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001353
1354 # set_servername_callback expects a callable, or None
1355 self.assertRaises(TypeError, ctx.set_servername_callback)
1356 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1357 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1358 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1359
1360 def dummycallback(sock, servername, ctx):
1361 pass
1362 ctx.set_servername_callback(None)
1363 ctx.set_servername_callback(dummycallback)
1364
1365 @needs_sni
1366 def test_sni_callback_refcycle(self):
1367 # Reference cycles through the servername callback are detected
1368 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001369 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001370 def dummycallback(sock, servername, ctx, cycle=ctx):
1371 pass
1372 ctx.set_servername_callback(dummycallback)
1373 wr = weakref.ref(ctx)
1374 del ctx, dummycallback
1375 gc.collect()
1376 self.assertIs(wr(), None)
1377
Christian Heimes9a5395a2013-06-17 15:44:12 +02001378 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001379 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001380 self.assertEqual(ctx.cert_store_stats(),
1381 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1382 ctx.load_cert_chain(CERTFILE)
1383 self.assertEqual(ctx.cert_store_stats(),
1384 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1385 ctx.load_verify_locations(CERTFILE)
1386 self.assertEqual(ctx.cert_store_stats(),
1387 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001388 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001389 self.assertEqual(ctx.cert_store_stats(),
1390 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1391
1392 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001393 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001394 self.assertEqual(ctx.get_ca_certs(), [])
1395 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1396 ctx.load_verify_locations(CERTFILE)
1397 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001398 # but CAFILE_CACERT is a CA cert
1399 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001400 self.assertEqual(ctx.get_ca_certs(),
1401 [{'issuer': ((('organizationName', 'Root CA'),),
1402 (('organizationalUnitName', 'http://www.cacert.org'),),
1403 (('commonName', 'CA Cert Signing Authority'),),
1404 (('emailAddress', 'support@cacert.org'),)),
1405 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1406 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1407 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001408 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001409 'subject': ((('organizationName', 'Root CA'),),
1410 (('organizationalUnitName', 'http://www.cacert.org'),),
1411 (('commonName', 'CA Cert Signing Authority'),),
1412 (('emailAddress', 'support@cacert.org'),)),
1413 'version': 3}])
1414
Martin Panterb55f8b72016-01-14 12:53:56 +00001415 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001416 pem = f.read()
1417 der = ssl.PEM_cert_to_DER_cert(pem)
1418 self.assertEqual(ctx.get_ca_certs(True), [der])
1419
Christian Heimes72d28502013-11-23 13:56:58 +01001420 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001421 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001422 ctx.load_default_certs()
1423
Christian Heimesa170fa12017-09-15 20:27:30 +02001424 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001425 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1426 ctx.load_default_certs()
1427
Christian Heimesa170fa12017-09-15 20:27:30 +02001428 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001429 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1430
Christian Heimesa170fa12017-09-15 20:27:30 +02001431 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001432 self.assertRaises(TypeError, ctx.load_default_certs, None)
1433 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1434
Benjamin Peterson91244e02014-10-03 18:17:15 -04001435 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001436 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001437 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001438 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001439 with support.EnvironmentVarGuard() as env:
1440 env["SSL_CERT_DIR"] = CAPATH
1441 env["SSL_CERT_FILE"] = CERTFILE
1442 ctx.load_default_certs()
1443 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1444
Benjamin Peterson91244e02014-10-03 18:17:15 -04001445 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001446 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001447 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001449 ctx.load_default_certs()
1450 stats = ctx.cert_store_stats()
1451
Christian Heimesa170fa12017-09-15 20:27:30 +02001452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001453 with support.EnvironmentVarGuard() as env:
1454 env["SSL_CERT_DIR"] = CAPATH
1455 env["SSL_CERT_FILE"] = CERTFILE
1456 ctx.load_default_certs()
1457 stats["x509"] += 1
1458 self.assertEqual(ctx.cert_store_stats(), stats)
1459
Christian Heimes358cfd42016-09-10 22:43:48 +02001460 def _assert_context_options(self, ctx):
1461 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1462 if OP_NO_COMPRESSION != 0:
1463 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1464 OP_NO_COMPRESSION)
1465 if OP_SINGLE_DH_USE != 0:
1466 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1467 OP_SINGLE_DH_USE)
1468 if OP_SINGLE_ECDH_USE != 0:
1469 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1470 OP_SINGLE_ECDH_USE)
1471 if OP_CIPHER_SERVER_PREFERENCE != 0:
1472 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1473 OP_CIPHER_SERVER_PREFERENCE)
1474
Christian Heimes4c05b472013-11-23 15:58:30 +01001475 def test_create_default_context(self):
1476 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001477
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001479 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001480 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001481 self._assert_context_options(ctx)
1482
Christian Heimes4c05b472013-11-23 15:58:30 +01001483 with open(SIGNING_CA) as f:
1484 cadata = f.read()
1485 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1486 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001487 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001488 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001489 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001490
1491 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001492 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001493 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001494 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001495
Christian Heimes67986f92013-11-23 22:43:47 +01001496 def test__create_stdlib_context(self):
1497 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001499 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001500 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001501 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001502
1503 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1504 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1505 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001506 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001507
1508 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001509 cert_reqs=ssl.CERT_REQUIRED,
1510 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001511 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1512 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001513 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001515
1516 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001517 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001518 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001519 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001520
Christian Heimes1aa9a752013-12-02 02:41:19 +01001521 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001522 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001523 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001524 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001525
Christian Heimese82c0342017-09-15 20:29:57 +02001526 # Auto set CERT_REQUIRED
1527 ctx.check_hostname = True
1528 self.assertTrue(ctx.check_hostname)
1529 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1530 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001531 ctx.verify_mode = ssl.CERT_REQUIRED
1532 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001533 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001534
Christian Heimese82c0342017-09-15 20:29:57 +02001535 # Changing verify_mode does not affect check_hostname
1536 ctx.check_hostname = False
1537 ctx.verify_mode = ssl.CERT_NONE
1538 ctx.check_hostname = False
1539 self.assertFalse(ctx.check_hostname)
1540 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1541 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001542 ctx.check_hostname = True
1543 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001544 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1545
1546 ctx.check_hostname = False
1547 ctx.verify_mode = ssl.CERT_OPTIONAL
1548 ctx.check_hostname = False
1549 self.assertFalse(ctx.check_hostname)
1550 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1551 # keep CERT_OPTIONAL
1552 ctx.check_hostname = True
1553 self.assertTrue(ctx.check_hostname)
1554 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001555
1556 # Cannot set CERT_NONE with check_hostname enabled
1557 with self.assertRaises(ValueError):
1558 ctx.verify_mode = ssl.CERT_NONE
1559 ctx.check_hostname = False
1560 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001561 ctx.verify_mode = ssl.CERT_NONE
1562 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001563
Christian Heimes5fe668c2016-09-12 00:01:11 +02001564 def test_context_client_server(self):
1565 # PROTOCOL_TLS_CLIENT has sane defaults
1566 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1567 self.assertTrue(ctx.check_hostname)
1568 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1569
1570 # PROTOCOL_TLS_SERVER has different but also sane defaults
1571 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1572 self.assertFalse(ctx.check_hostname)
1573 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1574
Christian Heimes4df60f12017-09-15 20:26:05 +02001575 def test_context_custom_class(self):
1576 class MySSLSocket(ssl.SSLSocket):
1577 pass
1578
1579 class MySSLObject(ssl.SSLObject):
1580 pass
1581
1582 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1583 ctx.sslsocket_class = MySSLSocket
1584 ctx.sslobject_class = MySSLObject
1585
1586 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1587 self.assertIsInstance(sock, MySSLSocket)
1588 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1589 self.assertIsInstance(obj, MySSLObject)
1590
Antoine Pitrou152efa22010-05-16 18:19:27 +00001591
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001592class SSLErrorTests(unittest.TestCase):
1593
1594 def test_str(self):
1595 # The str() of a SSLError doesn't include the errno
1596 e = ssl.SSLError(1, "foo")
1597 self.assertEqual(str(e), "foo")
1598 self.assertEqual(e.errno, 1)
1599 # Same for a subclass
1600 e = ssl.SSLZeroReturnError(1, "foo")
1601 self.assertEqual(str(e), "foo")
1602 self.assertEqual(e.errno, 1)
1603
1604 def test_lib_reason(self):
1605 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001606 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001607 with self.assertRaises(ssl.SSLError) as cm:
1608 ctx.load_dh_params(CERTFILE)
1609 self.assertEqual(cm.exception.library, 'PEM')
1610 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1611 s = str(cm.exception)
1612 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1613
1614 def test_subclass(self):
1615 # Check that the appropriate SSLError subclass is raised
1616 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001617 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1618 ctx.check_hostname = False
1619 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001620 with socket.socket() as s:
1621 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001622 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001623 c = socket.socket()
1624 c.connect(s.getsockname())
1625 c.setblocking(False)
1626 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001627 with self.assertRaises(ssl.SSLWantReadError) as cm:
1628 c.do_handshake()
1629 s = str(cm.exception)
1630 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1631 # For compatibility
1632 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1633
1634
Christian Heimes61d478c2018-01-27 15:51:38 +01001635 def test_bad_server_hostname(self):
1636 ctx = ssl.create_default_context()
1637 with self.assertRaises(ValueError):
1638 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1639 server_hostname="")
1640 with self.assertRaises(ValueError):
1641 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1642 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001643 with self.assertRaises(TypeError):
1644 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1645 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001646
1647
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001648class MemoryBIOTests(unittest.TestCase):
1649
1650 def test_read_write(self):
1651 bio = ssl.MemoryBIO()
1652 bio.write(b'foo')
1653 self.assertEqual(bio.read(), b'foo')
1654 self.assertEqual(bio.read(), b'')
1655 bio.write(b'foo')
1656 bio.write(b'bar')
1657 self.assertEqual(bio.read(), b'foobar')
1658 self.assertEqual(bio.read(), b'')
1659 bio.write(b'baz')
1660 self.assertEqual(bio.read(2), b'ba')
1661 self.assertEqual(bio.read(1), b'z')
1662 self.assertEqual(bio.read(1), b'')
1663
1664 def test_eof(self):
1665 bio = ssl.MemoryBIO()
1666 self.assertFalse(bio.eof)
1667 self.assertEqual(bio.read(), b'')
1668 self.assertFalse(bio.eof)
1669 bio.write(b'foo')
1670 self.assertFalse(bio.eof)
1671 bio.write_eof()
1672 self.assertFalse(bio.eof)
1673 self.assertEqual(bio.read(2), b'fo')
1674 self.assertFalse(bio.eof)
1675 self.assertEqual(bio.read(1), b'o')
1676 self.assertTrue(bio.eof)
1677 self.assertEqual(bio.read(), b'')
1678 self.assertTrue(bio.eof)
1679
1680 def test_pending(self):
1681 bio = ssl.MemoryBIO()
1682 self.assertEqual(bio.pending, 0)
1683 bio.write(b'foo')
1684 self.assertEqual(bio.pending, 3)
1685 for i in range(3):
1686 bio.read(1)
1687 self.assertEqual(bio.pending, 3-i-1)
1688 for i in range(3):
1689 bio.write(b'x')
1690 self.assertEqual(bio.pending, i+1)
1691 bio.read()
1692 self.assertEqual(bio.pending, 0)
1693
1694 def test_buffer_types(self):
1695 bio = ssl.MemoryBIO()
1696 bio.write(b'foo')
1697 self.assertEqual(bio.read(), b'foo')
1698 bio.write(bytearray(b'bar'))
1699 self.assertEqual(bio.read(), b'bar')
1700 bio.write(memoryview(b'baz'))
1701 self.assertEqual(bio.read(), b'baz')
1702
1703 def test_error_types(self):
1704 bio = ssl.MemoryBIO()
1705 self.assertRaises(TypeError, bio.write, 'foo')
1706 self.assertRaises(TypeError, bio.write, None)
1707 self.assertRaises(TypeError, bio.write, True)
1708 self.assertRaises(TypeError, bio.write, 1)
1709
1710
Christian Heimes9d50ab52018-02-27 10:17:30 +01001711class SSLObjectTests(unittest.TestCase):
1712 def test_private_init(self):
1713 bio = ssl.MemoryBIO()
1714 with self.assertRaisesRegex(TypeError, "public constructor"):
1715 ssl.SSLObject(bio, bio)
1716
1717
Martin Panter3840b2a2016-03-27 01:53:46 +00001718class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001719 """Tests that connect to a simple server running in the background"""
1720
1721 def setUp(self):
1722 server = ThreadedEchoServer(SIGNED_CERTFILE)
1723 self.server_addr = (HOST, server.port)
1724 server.__enter__()
1725 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001726
Antoine Pitrou480a1242010-04-28 21:37:09 +00001727 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001728 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001729 cert_reqs=ssl.CERT_NONE) as s:
1730 s.connect(self.server_addr)
1731 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001732 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001733
Martin Panter3840b2a2016-03-27 01:53:46 +00001734 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001735 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001736 cert_reqs=ssl.CERT_REQUIRED,
1737 ca_certs=SIGNING_CA) as s:
1738 s.connect(self.server_addr)
1739 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001740 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001741
Martin Panter3840b2a2016-03-27 01:53:46 +00001742 def test_connect_fail(self):
1743 # This should fail because we have no verification certs. Connection
1744 # failure crashes ThreadedEchoServer, so run this in an independent
1745 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001746 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001747 cert_reqs=ssl.CERT_REQUIRED)
1748 self.addCleanup(s.close)
1749 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1750 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001751
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001752 def test_connect_ex(self):
1753 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001754 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001755 cert_reqs=ssl.CERT_REQUIRED,
1756 ca_certs=SIGNING_CA)
1757 self.addCleanup(s.close)
1758 self.assertEqual(0, s.connect_ex(self.server_addr))
1759 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001760
1761 def test_non_blocking_connect_ex(self):
1762 # Issue #11326: non-blocking connect_ex() should allow handshake
1763 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001764 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001765 cert_reqs=ssl.CERT_REQUIRED,
1766 ca_certs=SIGNING_CA,
1767 do_handshake_on_connect=False)
1768 self.addCleanup(s.close)
1769 s.setblocking(False)
1770 rc = s.connect_ex(self.server_addr)
1771 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1772 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1773 # Wait for connect to finish
1774 select.select([], [s], [], 5.0)
1775 # Non-blocking handshake
1776 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001777 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001778 s.do_handshake()
1779 break
1780 except ssl.SSLWantReadError:
1781 select.select([s], [], [], 5.0)
1782 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001783 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001784 # SSL established
1785 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001786
Antoine Pitrou152efa22010-05-16 18:19:27 +00001787 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001788 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001789 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001790 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1791 s.connect(self.server_addr)
1792 self.assertEqual({}, s.getpeercert())
1793 # Same with a server hostname
1794 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1795 server_hostname="dummy") as s:
1796 s.connect(self.server_addr)
1797 ctx.verify_mode = ssl.CERT_REQUIRED
1798 # This should succeed because we specify the root cert
1799 ctx.load_verify_locations(SIGNING_CA)
1800 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1801 s.connect(self.server_addr)
1802 cert = s.getpeercert()
1803 self.assertTrue(cert)
1804
1805 def test_connect_with_context_fail(self):
1806 # This should fail because we have no verification certs. Connection
1807 # failure crashes ThreadedEchoServer, so run this in an independent
1808 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001809 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001810 ctx.verify_mode = ssl.CERT_REQUIRED
1811 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1812 self.addCleanup(s.close)
1813 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1814 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001815
1816 def test_connect_capath(self):
1817 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001818 # NOTE: the subject hashing algorithm has been changed between
1819 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1820 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001821 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001822 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001823 ctx.verify_mode = ssl.CERT_REQUIRED
1824 ctx.load_verify_locations(capath=CAPATH)
1825 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1826 s.connect(self.server_addr)
1827 cert = s.getpeercert()
1828 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001829
Martin Panter3840b2a2016-03-27 01:53:46 +00001830 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001831 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001832 ctx.verify_mode = ssl.CERT_REQUIRED
1833 ctx.load_verify_locations(capath=BYTES_CAPATH)
1834 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1835 s.connect(self.server_addr)
1836 cert = s.getpeercert()
1837 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001838
Christian Heimesefff7062013-11-21 03:35:02 +01001839 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001840 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001841 pem = f.read()
1842 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001843 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001844 ctx.verify_mode = ssl.CERT_REQUIRED
1845 ctx.load_verify_locations(cadata=pem)
1846 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1847 s.connect(self.server_addr)
1848 cert = s.getpeercert()
1849 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001850
Martin Panter3840b2a2016-03-27 01:53:46 +00001851 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001853 ctx.verify_mode = ssl.CERT_REQUIRED
1854 ctx.load_verify_locations(cadata=der)
1855 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1856 s.connect(self.server_addr)
1857 cert = s.getpeercert()
1858 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001859
Antoine Pitroue3220242010-04-24 11:13:53 +00001860 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1861 def test_makefile_close(self):
1862 # Issue #5238: creating a file-like object with makefile() shouldn't
1863 # delay closing the underlying "real socket" (here tested with its
1864 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001865 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001866 ss.connect(self.server_addr)
1867 fd = ss.fileno()
1868 f = ss.makefile()
1869 f.close()
1870 # The fd is still open
1871 os.read(fd, 0)
1872 # Closing the SSL socket should close the fd too
1873 ss.close()
1874 gc.collect()
1875 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001876 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001877 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001878
Antoine Pitrou480a1242010-04-28 21:37:09 +00001879 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001880 s = socket.socket(socket.AF_INET)
1881 s.connect(self.server_addr)
1882 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001883 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001884 cert_reqs=ssl.CERT_NONE,
1885 do_handshake_on_connect=False)
1886 self.addCleanup(s.close)
1887 count = 0
1888 while True:
1889 try:
1890 count += 1
1891 s.do_handshake()
1892 break
1893 except ssl.SSLWantReadError:
1894 select.select([s], [], [])
1895 except ssl.SSLWantWriteError:
1896 select.select([], [s], [])
1897 if support.verbose:
1898 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001899
Antoine Pitrou480a1242010-04-28 21:37:09 +00001900 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001901 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001902
Martin Panter3840b2a2016-03-27 01:53:46 +00001903 def test_get_server_certificate_fail(self):
1904 # Connection failure crashes ThreadedEchoServer, so run this in an
1905 # independent test method
1906 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001907
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001908 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001909 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001910 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1911 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001912 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001913 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1914 s.connect(self.server_addr)
1915 # Error checking can happen at instantiation or when connecting
1916 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1917 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001918 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001919 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1920 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001921
Christian Heimes9a5395a2013-06-17 15:44:12 +02001922 def test_get_ca_certs_capath(self):
1923 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001924 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001925 ctx.load_verify_locations(capath=CAPATH)
1926 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001927 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1928 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001929 s.connect(self.server_addr)
1930 cert = s.getpeercert()
1931 self.assertTrue(cert)
1932 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001933
Christian Heimes575596e2013-12-15 21:49:17 +01001934 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001935 def test_context_setget(self):
1936 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001937 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1938 ctx1.load_verify_locations(capath=CAPATH)
1939 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1940 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001941 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001942 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001943 ss.connect(self.server_addr)
1944 self.assertIs(ss.context, ctx1)
1945 self.assertIs(ss._sslobj.context, ctx1)
1946 ss.context = ctx2
1947 self.assertIs(ss.context, ctx2)
1948 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001949
1950 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1951 # A simple IO loop. Call func(*args) depending on the error we get
1952 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1953 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001954 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001955 count = 0
1956 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001957 if time.monotonic() > deadline:
1958 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001959 errno = None
1960 count += 1
1961 try:
1962 ret = func(*args)
1963 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001964 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001965 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001966 raise
1967 errno = e.errno
1968 # Get any data from the outgoing BIO irrespective of any error, and
1969 # send it to the socket.
1970 buf = outgoing.read()
1971 sock.sendall(buf)
1972 # If there's no error, we're done. For WANT_READ, we need to get
1973 # data from the socket and put it in the incoming BIO.
1974 if errno is None:
1975 break
1976 elif errno == ssl.SSL_ERROR_WANT_READ:
1977 buf = sock.recv(32768)
1978 if buf:
1979 incoming.write(buf)
1980 else:
1981 incoming.write_eof()
1982 if support.verbose:
1983 sys.stdout.write("Needed %d calls to complete %s().\n"
1984 % (count, func.__name__))
1985 return ret
1986
Martin Panter3840b2a2016-03-27 01:53:46 +00001987 def test_bio_handshake(self):
1988 sock = socket.socket(socket.AF_INET)
1989 self.addCleanup(sock.close)
1990 sock.connect(self.server_addr)
1991 incoming = ssl.MemoryBIO()
1992 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001993 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1994 self.assertTrue(ctx.check_hostname)
1995 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001996 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001997 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1998 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001999 self.assertIs(sslobj._sslobj.owner, sslobj)
2000 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002001 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002002 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002003 self.assertRaises(ValueError, sslobj.getpeercert)
2004 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2005 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2006 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2007 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002008 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002009 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002010 self.assertTrue(sslobj.getpeercert())
2011 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2012 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2013 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002014 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002015 except ssl.SSLSyscallError:
2016 # If the server shuts down the TCP connection without sending a
2017 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2018 pass
2019 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2020
2021 def test_bio_read_write_data(self):
2022 sock = socket.socket(socket.AF_INET)
2023 self.addCleanup(sock.close)
2024 sock.connect(self.server_addr)
2025 incoming = ssl.MemoryBIO()
2026 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002027 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002028 ctx.verify_mode = ssl.CERT_NONE
2029 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2030 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2031 req = b'FOO\n'
2032 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2033 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2034 self.assertEqual(buf, b'foo\n')
2035 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002036
2037
Martin Panter3840b2a2016-03-27 01:53:46 +00002038class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002039
Martin Panter3840b2a2016-03-27 01:53:46 +00002040 def test_timeout_connect_ex(self):
2041 # Issue #12065: on a timeout, connect_ex() should return the original
2042 # errno (mimicking the behaviour of non-SSL sockets).
2043 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002044 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002045 cert_reqs=ssl.CERT_REQUIRED,
2046 do_handshake_on_connect=False)
2047 self.addCleanup(s.close)
2048 s.settimeout(0.0000001)
2049 rc = s.connect_ex((REMOTE_HOST, 443))
2050 if rc == 0:
2051 self.skipTest("REMOTE_HOST responded too quickly")
2052 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2053
2054 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2055 def test_get_server_certificate_ipv6(self):
2056 with support.transient_internet('ipv6.google.com'):
2057 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2058 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2059
Martin Panter3840b2a2016-03-27 01:53:46 +00002060
2061def _test_get_server_certificate(test, host, port, cert=None):
2062 pem = ssl.get_server_certificate((host, port))
2063 if not pem:
2064 test.fail("No server certificate on %s:%s!" % (host, port))
2065
2066 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2067 if not pem:
2068 test.fail("No server certificate on %s:%s!" % (host, port))
2069 if support.verbose:
2070 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2071
2072def _test_get_server_certificate_fail(test, host, port):
2073 try:
2074 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2075 except ssl.SSLError as x:
2076 #should fail
2077 if support.verbose:
2078 sys.stdout.write("%s\n" % x)
2079 else:
2080 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2081
2082
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002083from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002084
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002085class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002086
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002087 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002088
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002089 """A mildly complicated class, because we want it to work both
2090 with and without the SSL wrapper around the socket connection, so
2091 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002092
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002093 def __init__(self, server, connsock, addr):
2094 self.server = server
2095 self.running = False
2096 self.sock = connsock
2097 self.addr = addr
2098 self.sock.setblocking(1)
2099 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002100 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002101 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002102
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002103 def wrap_conn(self):
2104 try:
2105 self.sslconn = self.server.context.wrap_socket(
2106 self.sock, server_side=True)
2107 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2108 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Christian Heimes529525f2018-05-23 22:24:45 +02002109 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002110 # We treat ConnectionResetError as though it were an
2111 # SSLError - OpenSSL on Ubuntu abruptly closes the
2112 # connection when asked to use an unsupported protocol.
2113 #
Christian Heimes529525f2018-05-23 22:24:45 +02002114 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2115 # tries to send session tickets after handshake.
2116 # https://github.com/openssl/openssl/issues/6342
2117 self.server.conn_errors.append(str(e))
2118 if self.server.chatty:
2119 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2120 self.running = False
2121 self.close()
2122 return False
2123 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002124 # OSError may occur with wrong protocols, e.g. both
2125 # sides use PROTOCOL_TLS_SERVER.
2126 #
2127 # XXX Various errors can have happened here, for example
2128 # a mismatching protocol version, an invalid certificate,
2129 # or a low-level bug. This should be made more discriminating.
2130 #
2131 # bpo-31323: Store the exception as string to prevent
2132 # a reference leak: server -> conn_errors -> exception
2133 # -> traceback -> self (ConnectionHandler) -> server
2134 self.server.conn_errors.append(str(e))
2135 if self.server.chatty:
2136 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2137 self.running = False
2138 self.server.stop()
2139 self.close()
2140 return False
2141 else:
2142 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2143 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2144 cert = self.sslconn.getpeercert()
2145 if support.verbose and self.server.chatty:
2146 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2147 cert_binary = self.sslconn.getpeercert(True)
2148 if support.verbose and self.server.chatty:
2149 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2150 cipher = self.sslconn.cipher()
2151 if support.verbose and self.server.chatty:
2152 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2153 sys.stdout.write(" server: selected protocol is now "
2154 + str(self.sslconn.selected_npn_protocol()) + "\n")
2155 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002156
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002157 def read(self):
2158 if self.sslconn:
2159 return self.sslconn.read()
2160 else:
2161 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002162
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002163 def write(self, bytes):
2164 if self.sslconn:
2165 return self.sslconn.write(bytes)
2166 else:
2167 return self.sock.send(bytes)
2168
2169 def close(self):
2170 if self.sslconn:
2171 self.sslconn.close()
2172 else:
2173 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002174
Antoine Pitrou480a1242010-04-28 21:37:09 +00002175 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002176 self.running = True
2177 if not self.server.starttls_server:
2178 if not self.wrap_conn():
2179 return
2180 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002181 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002182 msg = self.read()
2183 stripped = msg.strip()
2184 if not stripped:
2185 # eof, so quit this handler
2186 self.running = False
2187 try:
2188 self.sock = self.sslconn.unwrap()
2189 except OSError:
2190 # Many tests shut the TCP connection down
2191 # without an SSL shutdown. This causes
2192 # unwrap() to raise OSError with errno=0!
2193 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002194 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002195 self.sslconn = None
2196 self.close()
2197 elif stripped == b'over':
2198 if support.verbose and self.server.connectionchatty:
2199 sys.stdout.write(" server: client closed connection\n")
2200 self.close()
2201 return
2202 elif (self.server.starttls_server and
2203 stripped == b'STARTTLS'):
2204 if support.verbose and self.server.connectionchatty:
2205 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2206 self.write(b"OK\n")
2207 if not self.wrap_conn():
2208 return
2209 elif (self.server.starttls_server and self.sslconn
2210 and stripped == b'ENDTLS'):
2211 if support.verbose and self.server.connectionchatty:
2212 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2213 self.write(b"OK\n")
2214 self.sock = self.sslconn.unwrap()
2215 self.sslconn = None
2216 if support.verbose and self.server.connectionchatty:
2217 sys.stdout.write(" server: connection is now unencrypted...\n")
2218 elif stripped == b'CB tls-unique':
2219 if support.verbose and self.server.connectionchatty:
2220 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2221 data = self.sslconn.get_channel_binding("tls-unique")
2222 self.write(repr(data).encode("us-ascii") + b"\n")
2223 else:
2224 if (support.verbose and
2225 self.server.connectionchatty):
2226 ctype = (self.sslconn and "encrypted") or "unencrypted"
2227 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2228 % (msg, ctype, msg.lower(), ctype))
2229 self.write(msg.lower())
Christian Heimes529525f2018-05-23 22:24:45 +02002230 except ConnectionResetError:
2231 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2232 # when connection is not shut down gracefully.
2233 if self.server.chatty and support.verbose:
2234 sys.stdout.write(
2235 " Connection reset by peer: {}\n".format(
2236 self.addr)
2237 )
2238 self.close()
2239 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002240 except OSError:
2241 if self.server.chatty:
2242 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002243 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002244 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002245
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002246 # normally, we'd just stop here, but for the test
2247 # harness, we want to stop the server
2248 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002249
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002250 def __init__(self, certificate=None, ssl_version=None,
2251 certreqs=None, cacerts=None,
2252 chatty=True, connectionchatty=False, starttls_server=False,
2253 npn_protocols=None, alpn_protocols=None,
2254 ciphers=None, context=None):
2255 if context:
2256 self.context = context
2257 else:
2258 self.context = ssl.SSLContext(ssl_version
2259 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002260 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002261 self.context.verify_mode = (certreqs if certreqs is not None
2262 else ssl.CERT_NONE)
2263 if cacerts:
2264 self.context.load_verify_locations(cacerts)
2265 if certificate:
2266 self.context.load_cert_chain(certificate)
2267 if npn_protocols:
2268 self.context.set_npn_protocols(npn_protocols)
2269 if alpn_protocols:
2270 self.context.set_alpn_protocols(alpn_protocols)
2271 if ciphers:
2272 self.context.set_ciphers(ciphers)
2273 self.chatty = chatty
2274 self.connectionchatty = connectionchatty
2275 self.starttls_server = starttls_server
2276 self.sock = socket.socket()
2277 self.port = support.bind_port(self.sock)
2278 self.flag = None
2279 self.active = False
2280 self.selected_npn_protocols = []
2281 self.selected_alpn_protocols = []
2282 self.shared_ciphers = []
2283 self.conn_errors = []
2284 threading.Thread.__init__(self)
2285 self.daemon = True
2286
2287 def __enter__(self):
2288 self.start(threading.Event())
2289 self.flag.wait()
2290 return self
2291
2292 def __exit__(self, *args):
2293 self.stop()
2294 self.join()
2295
2296 def start(self, flag=None):
2297 self.flag = flag
2298 threading.Thread.start(self)
2299
2300 def run(self):
2301 self.sock.settimeout(0.05)
2302 self.sock.listen()
2303 self.active = True
2304 if self.flag:
2305 # signal an event
2306 self.flag.set()
2307 while self.active:
2308 try:
2309 newconn, connaddr = self.sock.accept()
2310 if support.verbose and self.chatty:
2311 sys.stdout.write(' server: new connection from '
2312 + repr(connaddr) + '\n')
2313 handler = self.ConnectionHandler(self, newconn, connaddr)
2314 handler.start()
2315 handler.join()
2316 except socket.timeout:
2317 pass
2318 except KeyboardInterrupt:
2319 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002320 except BaseException as e:
2321 if support.verbose and self.chatty:
2322 sys.stdout.write(
2323 ' connection handling failed: ' + repr(e) + '\n')
2324
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002325 self.sock.close()
2326
2327 def stop(self):
2328 self.active = False
2329
2330class AsyncoreEchoServer(threading.Thread):
2331
2332 # this one's based on asyncore.dispatcher
2333
2334 class EchoServer (asyncore.dispatcher):
2335
2336 class ConnectionHandler(asyncore.dispatcher_with_send):
2337
2338 def __init__(self, conn, certfile):
2339 self.socket = test_wrap_socket(conn, server_side=True,
2340 certfile=certfile,
2341 do_handshake_on_connect=False)
2342 asyncore.dispatcher_with_send.__init__(self, self.socket)
2343 self._ssl_accepting = True
2344 self._do_ssl_handshake()
2345
2346 def readable(self):
2347 if isinstance(self.socket, ssl.SSLSocket):
2348 while self.socket.pending() > 0:
2349 self.handle_read_event()
2350 return True
2351
2352 def _do_ssl_handshake(self):
2353 try:
2354 self.socket.do_handshake()
2355 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2356 return
2357 except ssl.SSLEOFError:
2358 return self.handle_close()
2359 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002360 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002361 except OSError as err:
2362 if err.args[0] == errno.ECONNABORTED:
2363 return self.handle_close()
2364 else:
2365 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002366
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002367 def handle_read(self):
2368 if self._ssl_accepting:
2369 self._do_ssl_handshake()
2370 else:
2371 data = self.recv(1024)
2372 if support.verbose:
2373 sys.stdout.write(" server: read %s from client\n" % repr(data))
2374 if not data:
2375 self.close()
2376 else:
2377 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002378
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002379 def handle_close(self):
2380 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002381 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002382 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002383
2384 def handle_error(self):
2385 raise
2386
Trent Nelson78520002008-04-10 20:54:35 +00002387 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002388 self.certfile = certfile
2389 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2390 self.port = support.bind_port(sock, '')
2391 asyncore.dispatcher.__init__(self, sock)
2392 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002393
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002394 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002395 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002396 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2397 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002398
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002399 def handle_error(self):
2400 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002401
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002402 def __init__(self, certfile):
2403 self.flag = None
2404 self.active = False
2405 self.server = self.EchoServer(certfile)
2406 self.port = self.server.port
2407 threading.Thread.__init__(self)
2408 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002409
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002410 def __str__(self):
2411 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002412
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002413 def __enter__(self):
2414 self.start(threading.Event())
2415 self.flag.wait()
2416 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002417
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002418 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002419 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002420 sys.stdout.write(" cleanup: stopping server.\n")
2421 self.stop()
2422 if support.verbose:
2423 sys.stdout.write(" cleanup: joining server thread.\n")
2424 self.join()
2425 if support.verbose:
2426 sys.stdout.write(" cleanup: successfully joined.\n")
2427 # make sure that ConnectionHandler is removed from socket_map
2428 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002429
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002430 def start (self, flag=None):
2431 self.flag = flag
2432 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002433
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002434 def run(self):
2435 self.active = True
2436 if self.flag:
2437 self.flag.set()
2438 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002439 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002440 asyncore.loop(1)
2441 except:
2442 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002443
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002444 def stop(self):
2445 self.active = False
2446 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002447
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002448def server_params_test(client_context, server_context, indata=b"FOO\n",
2449 chatty=True, connectionchatty=False, sni_name=None,
2450 session=None):
2451 """
2452 Launch a server, connect a client to it and try various reads
2453 and writes.
2454 """
2455 stats = {}
2456 server = ThreadedEchoServer(context=server_context,
2457 chatty=chatty,
2458 connectionchatty=False)
2459 with server:
2460 with client_context.wrap_socket(socket.socket(),
2461 server_hostname=sni_name, session=session) as s:
2462 s.connect((HOST, server.port))
2463 for arg in [indata, bytearray(indata), memoryview(indata)]:
2464 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002465 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002466 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002467 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002468 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002469 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002470 if connectionchatty:
2471 if support.verbose:
2472 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002473 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002474 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002475 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2476 % (outdata[:20], len(outdata),
2477 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002478 s.write(b"over\n")
2479 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002480 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002481 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002482 stats.update({
2483 'compression': s.compression(),
2484 'cipher': s.cipher(),
2485 'peercert': s.getpeercert(),
2486 'client_alpn_protocol': s.selected_alpn_protocol(),
2487 'client_npn_protocol': s.selected_npn_protocol(),
2488 'version': s.version(),
2489 'session_reused': s.session_reused,
2490 'session': s.session,
2491 })
2492 s.close()
2493 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2494 stats['server_npn_protocols'] = server.selected_npn_protocols
2495 stats['server_shared_ciphers'] = server.shared_ciphers
2496 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002497
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002498def try_protocol_combo(server_protocol, client_protocol, expect_success,
2499 certsreqs=None, server_options=0, client_options=0):
2500 """
2501 Try to SSL-connect using *client_protocol* to *server_protocol*.
2502 If *expect_success* is true, assert that the connection succeeds,
2503 if it's false, assert that the connection fails.
2504 Also, if *expect_success* is a string, assert that it is the protocol
2505 version actually used by the connection.
2506 """
2507 if certsreqs is None:
2508 certsreqs = ssl.CERT_NONE
2509 certtype = {
2510 ssl.CERT_NONE: "CERT_NONE",
2511 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2512 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2513 }[certsreqs]
2514 if support.verbose:
2515 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2516 sys.stdout.write(formatstr %
2517 (ssl.get_protocol_name(client_protocol),
2518 ssl.get_protocol_name(server_protocol),
2519 certtype))
2520 client_context = ssl.SSLContext(client_protocol)
2521 client_context.options |= client_options
2522 server_context = ssl.SSLContext(server_protocol)
2523 server_context.options |= server_options
2524
2525 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2526 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2527 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002528 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 client_context.set_ciphers("ALL")
2530
2531 for ctx in (client_context, server_context):
2532 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002533 ctx.load_cert_chain(SIGNED_CERTFILE)
2534 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002535 try:
2536 stats = server_params_test(client_context, server_context,
2537 chatty=False, connectionchatty=False)
2538 # Protocol mismatch can result in either an SSLError, or a
2539 # "Connection reset by peer" error.
2540 except ssl.SSLError:
2541 if expect_success:
2542 raise
2543 except OSError as e:
2544 if expect_success or e.errno != errno.ECONNRESET:
2545 raise
2546 else:
2547 if not expect_success:
2548 raise AssertionError(
2549 "Client protocol %s succeeded with server protocol %s!"
2550 % (ssl.get_protocol_name(client_protocol),
2551 ssl.get_protocol_name(server_protocol)))
2552 elif (expect_success is not True
2553 and expect_success != stats['version']):
2554 raise AssertionError("version mismatch: expected %r, got %r"
2555 % (expect_success, stats['version']))
2556
2557
2558class ThreadedTests(unittest.TestCase):
2559
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002560 def test_echo(self):
2561 """Basic test of an SSL client connecting to a server"""
2562 if support.verbose:
2563 sys.stdout.write("\n")
2564 for protocol in PROTOCOLS:
2565 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2566 continue
2567 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2568 context = ssl.SSLContext(protocol)
2569 context.load_cert_chain(CERTFILE)
2570 server_params_test(context, context,
2571 chatty=True, connectionchatty=True)
2572
Christian Heimesa170fa12017-09-15 20:27:30 +02002573 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002574
2575 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2576 server_params_test(client_context=client_context,
2577 server_context=server_context,
2578 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002579 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002580
2581 client_context.check_hostname = False
2582 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2583 with self.assertRaises(ssl.SSLError) as e:
2584 server_params_test(client_context=server_context,
2585 server_context=client_context,
2586 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002587 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002588 self.assertIn('called a function you should not call',
2589 str(e.exception))
2590
2591 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2592 with self.assertRaises(ssl.SSLError) as e:
2593 server_params_test(client_context=server_context,
2594 server_context=server_context,
2595 chatty=True, connectionchatty=True)
2596 self.assertIn('called a function you should not call',
2597 str(e.exception))
2598
2599 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2600 with self.assertRaises(ssl.SSLError) as e:
2601 server_params_test(client_context=server_context,
2602 server_context=client_context,
2603 chatty=True, connectionchatty=True)
2604 self.assertIn('called a function you should not call',
2605 str(e.exception))
2606
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002607 def test_getpeercert(self):
2608 if support.verbose:
2609 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002610
2611 client_context, server_context, hostname = testing_context()
2612 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002613 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002614 with client_context.wrap_socket(socket.socket(),
2615 do_handshake_on_connect=False,
2616 server_hostname=hostname) as s:
2617 s.connect((HOST, server.port))
2618 # getpeercert() raise ValueError while the handshake isn't
2619 # done.
2620 with self.assertRaises(ValueError):
2621 s.getpeercert()
2622 s.do_handshake()
2623 cert = s.getpeercert()
2624 self.assertTrue(cert, "Can't get peer certificate.")
2625 cipher = s.cipher()
2626 if support.verbose:
2627 sys.stdout.write(pprint.pformat(cert) + '\n')
2628 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2629 if 'subject' not in cert:
2630 self.fail("No subject field in certificate: %s." %
2631 pprint.pformat(cert))
2632 if ((('organizationName', 'Python Software Foundation'),)
2633 not in cert['subject']):
2634 self.fail(
2635 "Missing or invalid 'organizationName' field in certificate subject; "
2636 "should be 'Python Software Foundation'.")
2637 self.assertIn('notBefore', cert)
2638 self.assertIn('notAfter', cert)
2639 before = ssl.cert_time_to_seconds(cert['notBefore'])
2640 after = ssl.cert_time_to_seconds(cert['notAfter'])
2641 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002642
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002643 @unittest.skipUnless(have_verify_flags(),
2644 "verify_flags need OpenSSL > 0.9.8")
2645 def test_crl_check(self):
2646 if support.verbose:
2647 sys.stdout.write("\n")
2648
Christian Heimesa170fa12017-09-15 20:27:30 +02002649 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002650
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002651 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002652 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002653
2654 # VERIFY_DEFAULT should pass
2655 server = ThreadedEchoServer(context=server_context, chatty=True)
2656 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002657 with client_context.wrap_socket(socket.socket(),
2658 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002659 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002660 cert = s.getpeercert()
2661 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002662
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002663 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002664 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002665
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002666 server = ThreadedEchoServer(context=server_context, chatty=True)
2667 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002668 with client_context.wrap_socket(socket.socket(),
2669 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002670 with self.assertRaisesRegex(ssl.SSLError,
2671 "certificate verify failed"):
2672 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002673
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002674 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002675 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002676
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002677 server = ThreadedEchoServer(context=server_context, chatty=True)
2678 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002679 with client_context.wrap_socket(socket.socket(),
2680 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002681 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002682 cert = s.getpeercert()
2683 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002684
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002685 def test_check_hostname(self):
2686 if support.verbose:
2687 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002688
Christian Heimesa170fa12017-09-15 20:27:30 +02002689 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002690
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002691 # correct hostname should verify
2692 server = ThreadedEchoServer(context=server_context, chatty=True)
2693 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002694 with client_context.wrap_socket(socket.socket(),
2695 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002696 s.connect((HOST, server.port))
2697 cert = s.getpeercert()
2698 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002699
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002700 # incorrect hostname should raise an exception
2701 server = ThreadedEchoServer(context=server_context, chatty=True)
2702 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002703 with client_context.wrap_socket(socket.socket(),
2704 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002705 with self.assertRaisesRegex(
2706 ssl.CertificateError,
2707 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002708 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002709
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002710 # missing server_hostname arg should cause an exception, too
2711 server = ThreadedEchoServer(context=server_context, chatty=True)
2712 with server:
2713 with socket.socket() as s:
2714 with self.assertRaisesRegex(ValueError,
2715 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002716 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002717
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002718 def test_ecc_cert(self):
2719 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2720 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002721 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002722 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2723
2724 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2725 # load ECC cert
2726 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2727
2728 # correct hostname should verify
2729 server = ThreadedEchoServer(context=server_context, chatty=True)
2730 with server:
2731 with client_context.wrap_socket(socket.socket(),
2732 server_hostname=hostname) as s:
2733 s.connect((HOST, server.port))
2734 cert = s.getpeercert()
2735 self.assertTrue(cert, "Can't get peer certificate.")
2736 cipher = s.cipher()[0].split('-')
2737 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2738
2739 def test_dual_rsa_ecc(self):
2740 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2741 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002742 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2743 # algorithms.
2744 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002745 # only ECDSA certs
2746 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2747 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2748
2749 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2750 # load ECC and RSA key/cert pairs
2751 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2752 server_context.load_cert_chain(SIGNED_CERTFILE)
2753
2754 # correct hostname should verify
2755 server = ThreadedEchoServer(context=server_context, chatty=True)
2756 with server:
2757 with client_context.wrap_socket(socket.socket(),
2758 server_hostname=hostname) as s:
2759 s.connect((HOST, server.port))
2760 cert = s.getpeercert()
2761 self.assertTrue(cert, "Can't get peer certificate.")
2762 cipher = s.cipher()[0].split('-')
2763 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2764
Christian Heimes66e57422018-01-29 14:25:13 +01002765 def test_check_hostname_idn(self):
2766 if support.verbose:
2767 sys.stdout.write("\n")
2768
Christian Heimes11a14932018-02-24 02:35:08 +01002769 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002770 server_context.load_cert_chain(IDNSANSFILE)
2771
Christian Heimes11a14932018-02-24 02:35:08 +01002772 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002773 context.verify_mode = ssl.CERT_REQUIRED
2774 context.check_hostname = True
2775 context.load_verify_locations(SIGNING_CA)
2776
2777 # correct hostname should verify, when specified in several
2778 # different ways
2779 idn_hostnames = [
2780 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002781 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002782 ('xn--knig-5qa.idn.pythontest.net',
2783 'xn--knig-5qa.idn.pythontest.net'),
2784 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002785 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002786
2787 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002788 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002789 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2790 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2791 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002792 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2793
2794 # ('königsgäßchen.idna2008.pythontest.net',
2795 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2796 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2797 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2798 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2799 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2800
Christian Heimes66e57422018-01-29 14:25:13 +01002801 ]
2802 for server_hostname, expected_hostname in idn_hostnames:
2803 server = ThreadedEchoServer(context=server_context, chatty=True)
2804 with server:
2805 with context.wrap_socket(socket.socket(),
2806 server_hostname=server_hostname) as s:
2807 self.assertEqual(s.server_hostname, expected_hostname)
2808 s.connect((HOST, server.port))
2809 cert = s.getpeercert()
2810 self.assertEqual(s.server_hostname, expected_hostname)
2811 self.assertTrue(cert, "Can't get peer certificate.")
2812
Christian Heimes66e57422018-01-29 14:25:13 +01002813 # incorrect hostname should raise an exception
2814 server = ThreadedEchoServer(context=server_context, chatty=True)
2815 with server:
2816 with context.wrap_socket(socket.socket(),
2817 server_hostname="python.example.org") as s:
2818 with self.assertRaises(ssl.CertificateError):
2819 s.connect((HOST, server.port))
2820
Christian Heimes529525f2018-05-23 22:24:45 +02002821 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002822 """Connecting when the server rejects the client's certificate
2823
2824 Launch a server with CERT_REQUIRED, and check that trying to
2825 connect to it with a wrong client certificate fails.
2826 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002827 client_context, server_context, hostname = testing_context()
2828 # load client cert
2829 client_context.load_cert_chain(WRONG_CERT)
2830 # require TLS client authentication
2831 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002832 # TLS 1.3 has different handshake
2833 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002834
2835 server = ThreadedEchoServer(
2836 context=server_context, chatty=True, connectionchatty=True,
2837 )
2838
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002839 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002840 client_context.wrap_socket(socket.socket(),
2841 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002842 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002843 # Expect either an SSL error about the server rejecting
2844 # the connection, or a low-level connection reset (which
2845 # sometimes happens on Windows)
2846 s.connect((HOST, server.port))
2847 except ssl.SSLError as e:
2848 if support.verbose:
2849 sys.stdout.write("\nSSLError is %r\n" % e)
2850 except OSError as e:
2851 if e.errno != errno.ECONNRESET:
2852 raise
2853 if support.verbose:
2854 sys.stdout.write("\nsocket.error is %r\n" % e)
2855 else:
2856 self.fail("Use of invalid cert should have failed!")
2857
Christian Heimes529525f2018-05-23 22:24:45 +02002858 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2859 def test_wrong_cert_tls13(self):
2860 client_context, server_context, hostname = testing_context()
2861 client_context.load_cert_chain(WRONG_CERT)
2862 server_context.verify_mode = ssl.CERT_REQUIRED
2863 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2864 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2865
2866 server = ThreadedEchoServer(
2867 context=server_context, chatty=True, connectionchatty=True,
2868 )
2869 with server, \
2870 client_context.wrap_socket(socket.socket(),
2871 server_hostname=hostname) as s:
2872 # TLS 1.3 perform client cert exchange after handshake
2873 s.connect((HOST, server.port))
2874 try:
2875 s.write(b'data')
2876 s.read(4)
2877 except ssl.SSLError as e:
2878 if support.verbose:
2879 sys.stdout.write("\nSSLError is %r\n" % e)
2880 except OSError as e:
2881 if e.errno != errno.ECONNRESET:
2882 raise
2883 if support.verbose:
2884 sys.stdout.write("\nsocket.error is %r\n" % e)
2885 else:
2886 self.fail("Use of invalid cert should have failed!")
2887
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002888 def test_rude_shutdown(self):
2889 """A brutal shutdown of an SSL server should raise an OSError
2890 in the client when attempting handshake.
2891 """
2892 listener_ready = threading.Event()
2893 listener_gone = threading.Event()
2894
2895 s = socket.socket()
2896 port = support.bind_port(s, HOST)
2897
2898 # `listener` runs in a thread. It sits in an accept() until
2899 # the main thread connects. Then it rudely closes the socket,
2900 # and sets Event `listener_gone` to let the main thread know
2901 # the socket is gone.
2902 def listener():
2903 s.listen()
2904 listener_ready.set()
2905 newsock, addr = s.accept()
2906 newsock.close()
2907 s.close()
2908 listener_gone.set()
2909
2910 def connector():
2911 listener_ready.wait()
2912 with socket.socket() as c:
2913 c.connect((HOST, port))
2914 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002915 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002916 ssl_sock = test_wrap_socket(c)
2917 except OSError:
2918 pass
2919 else:
2920 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002921
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002922 t = threading.Thread(target=listener)
2923 t.start()
2924 try:
2925 connector()
2926 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002927 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002928
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002929 def test_ssl_cert_verify_error(self):
2930 if support.verbose:
2931 sys.stdout.write("\n")
2932
2933 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2934 server_context.load_cert_chain(SIGNED_CERTFILE)
2935
2936 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2937
2938 server = ThreadedEchoServer(context=server_context, chatty=True)
2939 with server:
2940 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002941 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002942 try:
2943 s.connect((HOST, server.port))
2944 except ssl.SSLError as e:
2945 msg = 'unable to get local issuer certificate'
2946 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2947 self.assertEqual(e.verify_code, 20)
2948 self.assertEqual(e.verify_message, msg)
2949 self.assertIn(msg, repr(e))
2950 self.assertIn('certificate verify failed', repr(e))
2951
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002952 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2953 "OpenSSL is compiled without SSLv2 support")
2954 def test_protocol_sslv2(self):
2955 """Connecting to an SSLv2 server with various client options"""
2956 if support.verbose:
2957 sys.stdout.write("\n")
2958 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2959 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2960 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002961 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002962 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2963 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2964 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2965 # SSLv23 client with specific SSL options
2966 if no_sslv2_implies_sslv3_hello():
2967 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002968 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002969 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002970 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002971 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002972 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002974
Christian Heimesa170fa12017-09-15 20:27:30 +02002975 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002976 """Connecting to an SSLv23 server with various client options"""
2977 if support.verbose:
2978 sys.stdout.write("\n")
2979 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002980 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002981 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002982 except OSError as x:
2983 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2984 if support.verbose:
2985 sys.stdout.write(
2986 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2987 % str(x))
2988 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002989 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2990 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2991 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002992
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002993 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002994 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2995 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2996 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002997
2998 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002999 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3000 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3001 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003002
3003 # Server with specific SSL options
3004 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003005 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003006 server_options=ssl.OP_NO_SSLv3)
3007 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003008 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003009 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003010 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003011 server_options=ssl.OP_NO_TLSv1)
3012
3013
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003014 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3015 "OpenSSL is compiled without SSLv3 support")
3016 def test_protocol_sslv3(self):
3017 """Connecting to an SSLv3 server with various client options"""
3018 if support.verbose:
3019 sys.stdout.write("\n")
3020 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3021 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3022 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3023 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3024 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003025 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003026 client_options=ssl.OP_NO_SSLv3)
3027 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3028 if no_sslv2_implies_sslv3_hello():
3029 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003030 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003031 False, client_options=ssl.OP_NO_SSLv2)
3032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003033 def test_protocol_tlsv1(self):
3034 """Connecting to a TLSv1 server with various client options"""
3035 if support.verbose:
3036 sys.stdout.write("\n")
3037 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3038 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3039 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3040 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3041 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3042 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3043 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003044 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003045 client_options=ssl.OP_NO_TLSv1)
3046
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003047 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3048 "TLS version 1.1 not supported.")
3049 def test_protocol_tlsv1_1(self):
3050 """Connecting to a TLSv1.1 server with various client options.
3051 Testing against older TLS versions."""
3052 if support.verbose:
3053 sys.stdout.write("\n")
3054 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3055 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3056 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3057 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3058 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003059 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003060 client_options=ssl.OP_NO_TLSv1_1)
3061
Christian Heimesa170fa12017-09-15 20:27:30 +02003062 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003063 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3064 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3065
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003066 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3067 "TLS version 1.2 not supported.")
3068 def test_protocol_tlsv1_2(self):
3069 """Connecting to a TLSv1.2 server with various client options.
3070 Testing against older TLS versions."""
3071 if support.verbose:
3072 sys.stdout.write("\n")
3073 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3074 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3075 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3076 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3077 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3078 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3079 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003080 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003081 client_options=ssl.OP_NO_TLSv1_2)
3082
Christian Heimesa170fa12017-09-15 20:27:30 +02003083 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003084 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3085 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3086 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3087 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3088
3089 def test_starttls(self):
3090 """Switching from clear text to encrypted and back again."""
3091 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3092
3093 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003094 starttls_server=True,
3095 chatty=True,
3096 connectionchatty=True)
3097 wrapped = False
3098 with server:
3099 s = socket.socket()
3100 s.setblocking(1)
3101 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003102 if support.verbose:
3103 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003104 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003105 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003106 sys.stdout.write(
3107 " client: sending %r...\n" % indata)
3108 if wrapped:
3109 conn.write(indata)
3110 outdata = conn.read()
3111 else:
3112 s.send(indata)
3113 outdata = s.recv(1024)
3114 msg = outdata.strip().lower()
3115 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3116 # STARTTLS ok, switch to secure mode
3117 if support.verbose:
3118 sys.stdout.write(
3119 " client: read %r from server, starting TLS...\n"
3120 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003121 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003122 wrapped = True
3123 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3124 # ENDTLS ok, switch back to clear text
3125 if support.verbose:
3126 sys.stdout.write(
3127 " client: read %r from server, ending TLS...\n"
3128 % msg)
3129 s = conn.unwrap()
3130 wrapped = False
3131 else:
3132 if support.verbose:
3133 sys.stdout.write(
3134 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003135 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003136 sys.stdout.write(" client: closing connection.\n")
3137 if wrapped:
3138 conn.write(b"over\n")
3139 else:
3140 s.send(b"over\n")
3141 if wrapped:
3142 conn.close()
3143 else:
3144 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003145
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003146 def test_socketserver(self):
3147 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003148 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003149 # try to connect
3150 if support.verbose:
3151 sys.stdout.write('\n')
3152 with open(CERTFILE, 'rb') as f:
3153 d1 = f.read()
3154 d2 = ''
3155 # now fetch the same data from the HTTPS server
3156 url = 'https://localhost:%d/%s' % (
3157 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003158 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003159 f = urllib.request.urlopen(url, context=context)
3160 try:
3161 dlen = f.info().get("content-length")
3162 if dlen and (int(dlen) > 0):
3163 d2 = f.read(int(dlen))
3164 if support.verbose:
3165 sys.stdout.write(
3166 " client: read %d bytes from remote server '%s'\n"
3167 % (len(d2), server))
3168 finally:
3169 f.close()
3170 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003171
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003172 def test_asyncore_server(self):
3173 """Check the example asyncore integration."""
3174 if support.verbose:
3175 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003176
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003177 indata = b"FOO\n"
3178 server = AsyncoreEchoServer(CERTFILE)
3179 with server:
3180 s = test_wrap_socket(socket.socket())
3181 s.connect(('127.0.0.1', server.port))
3182 if support.verbose:
3183 sys.stdout.write(
3184 " client: sending %r...\n" % indata)
3185 s.write(indata)
3186 outdata = s.read()
3187 if support.verbose:
3188 sys.stdout.write(" client: read %r\n" % outdata)
3189 if outdata != indata.lower():
3190 self.fail(
3191 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3192 % (outdata[:20], len(outdata),
3193 indata[:20].lower(), len(indata)))
3194 s.write(b"over\n")
3195 if support.verbose:
3196 sys.stdout.write(" client: closing connection.\n")
3197 s.close()
3198 if support.verbose:
3199 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003200
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003201 def test_recv_send(self):
3202 """Test recv(), send() and friends."""
3203 if support.verbose:
3204 sys.stdout.write("\n")
3205
3206 server = ThreadedEchoServer(CERTFILE,
3207 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003208 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 cacerts=CERTFILE,
3210 chatty=True,
3211 connectionchatty=False)
3212 with server:
3213 s = test_wrap_socket(socket.socket(),
3214 server_side=False,
3215 certfile=CERTFILE,
3216 ca_certs=CERTFILE,
3217 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003218 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 s.connect((HOST, server.port))
3220 # helper methods for standardising recv* method signatures
3221 def _recv_into():
3222 b = bytearray(b"\0"*100)
3223 count = s.recv_into(b)
3224 return b[:count]
3225
3226 def _recvfrom_into():
3227 b = bytearray(b"\0"*100)
3228 count, addr = s.recvfrom_into(b)
3229 return b[:count]
3230
3231 # (name, method, expect success?, *args, return value func)
3232 send_methods = [
3233 ('send', s.send, True, [], len),
3234 ('sendto', s.sendto, False, ["some.address"], len),
3235 ('sendall', s.sendall, True, [], lambda x: None),
3236 ]
3237 # (name, method, whether to expect success, *args)
3238 recv_methods = [
3239 ('recv', s.recv, True, []),
3240 ('recvfrom', s.recvfrom, False, ["some.address"]),
3241 ('recv_into', _recv_into, True, []),
3242 ('recvfrom_into', _recvfrom_into, False, []),
3243 ]
3244 data_prefix = "PREFIX_"
3245
3246 for (meth_name, send_meth, expect_success, args,
3247 ret_val_meth) in send_methods:
3248 indata = (data_prefix + meth_name).encode('ascii')
3249 try:
3250 ret = send_meth(indata, *args)
3251 msg = "sending with {}".format(meth_name)
3252 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3253 outdata = s.read()
3254 if outdata != indata.lower():
3255 self.fail(
3256 "While sending with <<{name:s}>> bad data "
3257 "<<{outdata:r}>> ({nout:d}) received; "
3258 "expected <<{indata:r}>> ({nin:d})\n".format(
3259 name=meth_name, outdata=outdata[:20],
3260 nout=len(outdata),
3261 indata=indata[:20], nin=len(indata)
3262 )
3263 )
3264 except ValueError as e:
3265 if expect_success:
3266 self.fail(
3267 "Failed to send with method <<{name:s}>>; "
3268 "expected to succeed.\n".format(name=meth_name)
3269 )
3270 if not str(e).startswith(meth_name):
3271 self.fail(
3272 "Method <<{name:s}>> failed with unexpected "
3273 "exception message: {exp:s}\n".format(
3274 name=meth_name, exp=e
3275 )
3276 )
3277
3278 for meth_name, recv_meth, expect_success, args in recv_methods:
3279 indata = (data_prefix + meth_name).encode('ascii')
3280 try:
3281 s.send(indata)
3282 outdata = recv_meth(*args)
3283 if outdata != indata.lower():
3284 self.fail(
3285 "While receiving with <<{name:s}>> bad data "
3286 "<<{outdata:r}>> ({nout:d}) received; "
3287 "expected <<{indata:r}>> ({nin:d})\n".format(
3288 name=meth_name, outdata=outdata[:20],
3289 nout=len(outdata),
3290 indata=indata[:20], nin=len(indata)
3291 )
3292 )
3293 except ValueError as e:
3294 if expect_success:
3295 self.fail(
3296 "Failed to receive with method <<{name:s}>>; "
3297 "expected to succeed.\n".format(name=meth_name)
3298 )
3299 if not str(e).startswith(meth_name):
3300 self.fail(
3301 "Method <<{name:s}>> failed with unexpected "
3302 "exception message: {exp:s}\n".format(
3303 name=meth_name, exp=e
3304 )
3305 )
3306 # consume data
3307 s.read()
3308
3309 # read(-1, buffer) is supported, even though read(-1) is not
3310 data = b"data"
3311 s.send(data)
3312 buffer = bytearray(len(data))
3313 self.assertEqual(s.read(-1, buffer), len(data))
3314 self.assertEqual(buffer, data)
3315
Christian Heimes888bbdc2017-09-07 14:18:21 -07003316 # sendall accepts bytes-like objects
3317 if ctypes is not None:
3318 ubyte = ctypes.c_ubyte * len(data)
3319 byteslike = ubyte.from_buffer_copy(data)
3320 s.sendall(byteslike)
3321 self.assertEqual(s.read(), data)
3322
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003323 # Make sure sendmsg et al are disallowed to avoid
3324 # inadvertent disclosure of data and/or corruption
3325 # of the encrypted data stream
3326 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3327 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3328 self.assertRaises(NotImplementedError,
3329 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003330 s.write(b"over\n")
3331
3332 self.assertRaises(ValueError, s.recv, -1)
3333 self.assertRaises(ValueError, s.read, -1)
3334
3335 s.close()
3336
3337 def test_recv_zero(self):
3338 server = ThreadedEchoServer(CERTFILE)
3339 server.__enter__()
3340 self.addCleanup(server.__exit__, None, None)
3341 s = socket.create_connection((HOST, server.port))
3342 self.addCleanup(s.close)
3343 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3344 self.addCleanup(s.close)
3345
3346 # recv/read(0) should return no data
3347 s.send(b"data")
3348 self.assertEqual(s.recv(0), b"")
3349 self.assertEqual(s.read(0), b"")
3350 self.assertEqual(s.read(), b"data")
3351
3352 # Should not block if the other end sends no data
3353 s.setblocking(False)
3354 self.assertEqual(s.recv(0), b"")
3355 self.assertEqual(s.recv_into(bytearray()), 0)
3356
3357 def test_nonblocking_send(self):
3358 server = ThreadedEchoServer(CERTFILE,
3359 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003360 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003361 cacerts=CERTFILE,
3362 chatty=True,
3363 connectionchatty=False)
3364 with server:
3365 s = test_wrap_socket(socket.socket(),
3366 server_side=False,
3367 certfile=CERTFILE,
3368 ca_certs=CERTFILE,
3369 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003370 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003371 s.connect((HOST, server.port))
3372 s.setblocking(False)
3373
3374 # If we keep sending data, at some point the buffers
3375 # will be full and the call will block
3376 buf = bytearray(8192)
3377 def fill_buffer():
3378 while True:
3379 s.send(buf)
3380 self.assertRaises((ssl.SSLWantWriteError,
3381 ssl.SSLWantReadError), fill_buffer)
3382
3383 # Now read all the output and discard it
3384 s.setblocking(True)
3385 s.close()
3386
3387 def test_handshake_timeout(self):
3388 # Issue #5103: SSL handshake must respect the socket timeout
3389 server = socket.socket(socket.AF_INET)
3390 host = "127.0.0.1"
3391 port = support.bind_port(server)
3392 started = threading.Event()
3393 finish = False
3394
3395 def serve():
3396 server.listen()
3397 started.set()
3398 conns = []
3399 while not finish:
3400 r, w, e = select.select([server], [], [], 0.1)
3401 if server in r:
3402 # Let the socket hang around rather than having
3403 # it closed by garbage collection.
3404 conns.append(server.accept()[0])
3405 for sock in conns:
3406 sock.close()
3407
3408 t = threading.Thread(target=serve)
3409 t.start()
3410 started.wait()
3411
3412 try:
3413 try:
3414 c = socket.socket(socket.AF_INET)
3415 c.settimeout(0.2)
3416 c.connect((host, port))
3417 # Will attempt handshake and time out
3418 self.assertRaisesRegex(socket.timeout, "timed out",
3419 test_wrap_socket, c)
3420 finally:
3421 c.close()
3422 try:
3423 c = socket.socket(socket.AF_INET)
3424 c = test_wrap_socket(c)
3425 c.settimeout(0.2)
3426 # Will attempt handshake and time out
3427 self.assertRaisesRegex(socket.timeout, "timed out",
3428 c.connect, (host, port))
3429 finally:
3430 c.close()
3431 finally:
3432 finish = True
3433 t.join()
3434 server.close()
3435
3436 def test_server_accept(self):
3437 # Issue #16357: accept() on a SSLSocket created through
3438 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003439 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003440 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003441 context.load_verify_locations(SIGNING_CA)
3442 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003443 server = socket.socket(socket.AF_INET)
3444 host = "127.0.0.1"
3445 port = support.bind_port(server)
3446 server = context.wrap_socket(server, server_side=True)
3447 self.assertTrue(server.server_side)
3448
3449 evt = threading.Event()
3450 remote = None
3451 peer = None
3452 def serve():
3453 nonlocal remote, peer
3454 server.listen()
3455 # Block on the accept and wait on the connection to close.
3456 evt.set()
3457 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003458 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003459
3460 t = threading.Thread(target=serve)
3461 t.start()
3462 # Client wait until server setup and perform a connect.
3463 evt.wait()
3464 client = context.wrap_socket(socket.socket())
3465 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003466 client.send(b'data')
3467 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003468 client_addr = client.getsockname()
3469 client.close()
3470 t.join()
3471 remote.close()
3472 server.close()
3473 # Sanity checks.
3474 self.assertIsInstance(remote, ssl.SSLSocket)
3475 self.assertEqual(peer, client_addr)
3476
3477 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003478 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003479 with context.wrap_socket(socket.socket()) as sock:
3480 with self.assertRaises(OSError) as cm:
3481 sock.getpeercert()
3482 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3483
3484 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003485 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003486 with context.wrap_socket(socket.socket()) as sock:
3487 with self.assertRaises(OSError) as cm:
3488 sock.do_handshake()
3489 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3490
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003491 def test_no_shared_ciphers(self):
3492 client_context, server_context, hostname = testing_context()
3493 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3494 client_context.options |= ssl.OP_NO_TLSv1_3
3495 # Force different suites on client and master
3496 client_context.set_ciphers("AES128")
3497 server_context.set_ciphers("AES256")
3498 with ThreadedEchoServer(context=server_context) as server:
3499 with client_context.wrap_socket(socket.socket(),
3500 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003501 with self.assertRaises(OSError):
3502 s.connect((HOST, server.port))
3503 self.assertIn("no shared cipher", server.conn_errors[0])
3504
3505 def test_version_basic(self):
3506 """
3507 Basic tests for SSLSocket.version().
3508 More tests are done in the test_protocol_*() methods.
3509 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003510 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3511 context.check_hostname = False
3512 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003513 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003514 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003515 chatty=False) as server:
3516 with context.wrap_socket(socket.socket()) as s:
3517 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003518 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003519 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003520 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003521 self.assertEqual(s.version(), 'TLSv1.3')
3522 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003523 self.assertEqual(s.version(), 'TLSv1.2')
3524 else: # 0.9.8 to 1.0.1
3525 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003526 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003527 self.assertIs(s.version(), None)
3528
Christian Heimescb5b68a2017-09-07 18:07:00 -07003529 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3530 "test requires TLSv1.3 enabled OpenSSL")
3531 def test_tls1_3(self):
3532 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3533 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003534 context.options |= (
3535 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3536 )
3537 with ThreadedEchoServer(context=context) as server:
3538 with context.wrap_socket(socket.socket()) as s:
3539 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003540 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003541 'TLS_AES_256_GCM_SHA384',
3542 'TLS_CHACHA20_POLY1305_SHA256',
3543 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003544 })
3545 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003546
Christian Heimes698dde12018-02-27 11:54:43 +01003547 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3548 "required OpenSSL 1.1.0g")
3549 def test_min_max_version(self):
3550 client_context, server_context, hostname = testing_context()
3551 # client TLSv1.0 to 1.2
3552 client_context.minimum_version = ssl.TLSVersion.TLSv1
3553 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3554 # server only TLSv1.2
3555 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3556 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3557
3558 with ThreadedEchoServer(context=server_context) as server:
3559 with client_context.wrap_socket(socket.socket(),
3560 server_hostname=hostname) as s:
3561 s.connect((HOST, server.port))
3562 self.assertEqual(s.version(), 'TLSv1.2')
3563
3564 # client 1.0 to 1.2, server 1.0 to 1.1
3565 server_context.minimum_version = ssl.TLSVersion.TLSv1
3566 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3567
3568 with ThreadedEchoServer(context=server_context) as server:
3569 with client_context.wrap_socket(socket.socket(),
3570 server_hostname=hostname) as s:
3571 s.connect((HOST, server.port))
3572 self.assertEqual(s.version(), 'TLSv1.1')
3573
3574 # client 1.0, server 1.2 (mismatch)
3575 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3576 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3577 client_context.minimum_version = ssl.TLSVersion.TLSv1
3578 client_context.maximum_version = ssl.TLSVersion.TLSv1
3579 with ThreadedEchoServer(context=server_context) as server:
3580 with client_context.wrap_socket(socket.socket(),
3581 server_hostname=hostname) as s:
3582 with self.assertRaises(ssl.SSLError) as e:
3583 s.connect((HOST, server.port))
3584 self.assertIn("alert", str(e.exception))
3585
3586
3587 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3588 "required OpenSSL 1.1.0g")
3589 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3590 def test_min_max_version_sslv3(self):
3591 client_context, server_context, hostname = testing_context()
3592 server_context.minimum_version = ssl.TLSVersion.SSLv3
3593 client_context.minimum_version = ssl.TLSVersion.SSLv3
3594 client_context.maximum_version = ssl.TLSVersion.SSLv3
3595 with ThreadedEchoServer(context=server_context) as server:
3596 with client_context.wrap_socket(socket.socket(),
3597 server_hostname=hostname) as s:
3598 s.connect((HOST, server.port))
3599 self.assertEqual(s.version(), 'SSLv3')
3600
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003601 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3602 def test_default_ecdh_curve(self):
3603 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3604 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003605 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003606 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003607 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3608 # cipher name.
3609 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003610 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3611 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3612 # our default cipher list should prefer ECDH-based ciphers
3613 # automatically.
3614 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3615 context.set_ciphers("ECCdraft:ECDH")
3616 with ThreadedEchoServer(context=context) as server:
3617 with context.wrap_socket(socket.socket()) as s:
3618 s.connect((HOST, server.port))
3619 self.assertIn("ECDH", s.cipher()[0])
3620
3621 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3622 "'tls-unique' channel binding not available")
3623 def test_tls_unique_channel_binding(self):
3624 """Test tls-unique channel binding."""
3625 if support.verbose:
3626 sys.stdout.write("\n")
3627
Christian Heimes05d9fe32018-02-27 08:55:39 +01003628 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003629
3630 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003631 chatty=True,
3632 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003633
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003634 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003635 with client_context.wrap_socket(
3636 socket.socket(),
3637 server_hostname=hostname) as s:
3638 s.connect((HOST, server.port))
3639 # get the data
3640 cb_data = s.get_channel_binding("tls-unique")
3641 if support.verbose:
3642 sys.stdout.write(
3643 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003644
Christian Heimes05d9fe32018-02-27 08:55:39 +01003645 # check if it is sane
3646 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003647 if s.version() == 'TLSv1.3':
3648 self.assertEqual(len(cb_data), 48)
3649 else:
3650 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003651
Christian Heimes05d9fe32018-02-27 08:55:39 +01003652 # and compare with the peers version
3653 s.write(b"CB tls-unique\n")
3654 peer_data_repr = s.read().strip()
3655 self.assertEqual(peer_data_repr,
3656 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003657
3658 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003659 with client_context.wrap_socket(
3660 socket.socket(),
3661 server_hostname=hostname) as s:
3662 s.connect((HOST, server.port))
3663 new_cb_data = s.get_channel_binding("tls-unique")
3664 if support.verbose:
3665 sys.stdout.write(
3666 "got another channel binding data: {0!r}\n".format(
3667 new_cb_data)
3668 )
3669 # is it really unique
3670 self.assertNotEqual(cb_data, new_cb_data)
3671 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003672 if s.version() == 'TLSv1.3':
3673 self.assertEqual(len(cb_data), 48)
3674 else:
3675 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003676 s.write(b"CB tls-unique\n")
3677 peer_data_repr = s.read().strip()
3678 self.assertEqual(peer_data_repr,
3679 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003680
3681 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003682 client_context, server_context, hostname = testing_context()
3683 stats = server_params_test(client_context, server_context,
3684 chatty=True, connectionchatty=True,
3685 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003686 if support.verbose:
3687 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3688 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3689
3690 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3691 "ssl.OP_NO_COMPRESSION needed for this test")
3692 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003693 client_context, server_context, hostname = testing_context()
3694 client_context.options |= ssl.OP_NO_COMPRESSION
3695 server_context.options |= ssl.OP_NO_COMPRESSION
3696 stats = server_params_test(client_context, server_context,
3697 chatty=True, connectionchatty=True,
3698 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003699 self.assertIs(stats['compression'], None)
3700
3701 def test_dh_params(self):
3702 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003703 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003704 # test scenario needs TLS <= 1.2
3705 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003706 server_context.load_dh_params(DHFILE)
3707 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003708 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003709 stats = server_params_test(client_context, server_context,
3710 chatty=True, connectionchatty=True,
3711 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003712 cipher = stats["cipher"][0]
3713 parts = cipher.split("-")
3714 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3715 self.fail("Non-DH cipher: " + cipher[0])
3716
Christian Heimesb7b92252018-02-25 09:49:31 +01003717 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003718 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003719 def test_ecdh_curve(self):
3720 # server secp384r1, client auto
3721 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003722
Christian Heimesb7b92252018-02-25 09:49:31 +01003723 server_context.set_ecdh_curve("secp384r1")
3724 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3725 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3726 stats = server_params_test(client_context, server_context,
3727 chatty=True, connectionchatty=True,
3728 sni_name=hostname)
3729
3730 # server auto, client secp384r1
3731 client_context, server_context, hostname = testing_context()
3732 client_context.set_ecdh_curve("secp384r1")
3733 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3734 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3735 stats = server_params_test(client_context, server_context,
3736 chatty=True, connectionchatty=True,
3737 sni_name=hostname)
3738
3739 # server / client curve mismatch
3740 client_context, server_context, hostname = testing_context()
3741 client_context.set_ecdh_curve("prime256v1")
3742 server_context.set_ecdh_curve("secp384r1")
3743 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3744 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3745 try:
3746 stats = server_params_test(client_context, server_context,
3747 chatty=True, connectionchatty=True,
3748 sni_name=hostname)
3749 except ssl.SSLError:
3750 pass
3751 else:
3752 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003753 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003754 self.fail("mismatch curve did not fail")
3755
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003756 def test_selected_alpn_protocol(self):
3757 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003758 client_context, server_context, hostname = testing_context()
3759 stats = server_params_test(client_context, server_context,
3760 chatty=True, connectionchatty=True,
3761 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003762 self.assertIs(stats['client_alpn_protocol'], None)
3763
3764 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3765 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3766 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003767 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003768 server_context.set_alpn_protocols(['foo', 'bar'])
3769 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003770 chatty=True, connectionchatty=True,
3771 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003772 self.assertIs(stats['client_alpn_protocol'], None)
3773
3774 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3775 def test_alpn_protocols(self):
3776 server_protocols = ['foo', 'bar', 'milkshake']
3777 protocol_tests = [
3778 (['foo', 'bar'], 'foo'),
3779 (['bar', 'foo'], 'foo'),
3780 (['milkshake'], 'milkshake'),
3781 (['http/3.0', 'http/4.0'], None)
3782 ]
3783 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003784 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003785 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003786 client_context.set_alpn_protocols(client_protocols)
3787
3788 try:
3789 stats = server_params_test(client_context,
3790 server_context,
3791 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003792 connectionchatty=True,
3793 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003794 except ssl.SSLError as e:
3795 stats = e
3796
Christian Heimes05d9fe32018-02-27 08:55:39 +01003797 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003798 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3799 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3800 self.assertIsInstance(stats, ssl.SSLError)
3801 else:
3802 msg = "failed trying %s (s) and %s (c).\n" \
3803 "was expecting %s, but got %%s from the %%s" \
3804 % (str(server_protocols), str(client_protocols),
3805 str(expected))
3806 client_result = stats['client_alpn_protocol']
3807 self.assertEqual(client_result, expected,
3808 msg % (client_result, "client"))
3809 server_result = stats['server_alpn_protocols'][-1] \
3810 if len(stats['server_alpn_protocols']) else 'nothing'
3811 self.assertEqual(server_result, expected,
3812 msg % (server_result, "server"))
3813
3814 def test_selected_npn_protocol(self):
3815 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003816 client_context, server_context, hostname = testing_context()
3817 stats = server_params_test(client_context, server_context,
3818 chatty=True, connectionchatty=True,
3819 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003820 self.assertIs(stats['client_npn_protocol'], None)
3821
3822 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3823 def test_npn_protocols(self):
3824 server_protocols = ['http/1.1', 'spdy/2']
3825 protocol_tests = [
3826 (['http/1.1', 'spdy/2'], 'http/1.1'),
3827 (['spdy/2', 'http/1.1'], 'http/1.1'),
3828 (['spdy/2', 'test'], 'spdy/2'),
3829 (['abc', 'def'], 'abc')
3830 ]
3831 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003832 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003833 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003834 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003835 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003836 chatty=True, connectionchatty=True,
3837 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003838 msg = "failed trying %s (s) and %s (c).\n" \
3839 "was expecting %s, but got %%s from the %%s" \
3840 % (str(server_protocols), str(client_protocols),
3841 str(expected))
3842 client_result = stats['client_npn_protocol']
3843 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3844 server_result = stats['server_npn_protocols'][-1] \
3845 if len(stats['server_npn_protocols']) else 'nothing'
3846 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3847
3848 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003849 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003850 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003851 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003852 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003853 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003854 client_context.load_verify_locations(SIGNING_CA)
3855 return server_context, other_context, client_context
3856
3857 def check_common_name(self, stats, name):
3858 cert = stats['peercert']
3859 self.assertIn((('commonName', name),), cert['subject'])
3860
3861 @needs_sni
3862 def test_sni_callback(self):
3863 calls = []
3864 server_context, other_context, client_context = self.sni_contexts()
3865
Christian Heimesa170fa12017-09-15 20:27:30 +02003866 client_context.check_hostname = False
3867
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003868 def servername_cb(ssl_sock, server_name, initial_context):
3869 calls.append((server_name, initial_context))
3870 if server_name is not None:
3871 ssl_sock.context = other_context
3872 server_context.set_servername_callback(servername_cb)
3873
3874 stats = server_params_test(client_context, server_context,
3875 chatty=True,
3876 sni_name='supermessage')
3877 # The hostname was fetched properly, and the certificate was
3878 # changed for the connection.
3879 self.assertEqual(calls, [("supermessage", server_context)])
3880 # CERTFILE4 was selected
3881 self.check_common_name(stats, 'fakehostname')
3882
3883 calls = []
3884 # The callback is called with server_name=None
3885 stats = server_params_test(client_context, server_context,
3886 chatty=True,
3887 sni_name=None)
3888 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003889 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003890
3891 # Check disabling the callback
3892 calls = []
3893 server_context.set_servername_callback(None)
3894
3895 stats = server_params_test(client_context, server_context,
3896 chatty=True,
3897 sni_name='notfunny')
3898 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003899 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003900 self.assertEqual(calls, [])
3901
3902 @needs_sni
3903 def test_sni_callback_alert(self):
3904 # Returning a TLS alert is reflected to the connecting client
3905 server_context, other_context, client_context = self.sni_contexts()
3906
3907 def cb_returning_alert(ssl_sock, server_name, initial_context):
3908 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3909 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003910 with self.assertRaises(ssl.SSLError) as cm:
3911 stats = server_params_test(client_context, server_context,
3912 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003913 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003914 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003915
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003916 @needs_sni
3917 def test_sni_callback_raising(self):
3918 # Raising fails the connection with a TLS handshake failure alert.
3919 server_context, other_context, client_context = self.sni_contexts()
3920
3921 def cb_raising(ssl_sock, server_name, initial_context):
3922 1/0
3923 server_context.set_servername_callback(cb_raising)
3924
3925 with self.assertRaises(ssl.SSLError) as cm, \
3926 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003927 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003928 chatty=False,
3929 sni_name='supermessage')
3930 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3931 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003932
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003933 @needs_sni
3934 def test_sni_callback_wrong_return_type(self):
3935 # Returning the wrong return type terminates the TLS connection
3936 # with an internal error alert.
3937 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003938
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3940 return "foo"
3941 server_context.set_servername_callback(cb_wrong_return_type)
3942
3943 with self.assertRaises(ssl.SSLError) as cm, \
3944 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003945 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003946 chatty=False,
3947 sni_name='supermessage')
3948 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3949 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003950
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003951 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003952 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003953 client_context.set_ciphers("AES128:AES256")
3954 server_context.set_ciphers("AES256")
3955 expected_algs = [
3956 "AES256", "AES-256",
3957 # TLS 1.3 ciphers are always enabled
3958 "TLS_CHACHA20", "TLS_AES",
3959 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003960
Christian Heimesa170fa12017-09-15 20:27:30 +02003961 stats = server_params_test(client_context, server_context,
3962 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003963 ciphers = stats['server_shared_ciphers'][0]
3964 self.assertGreater(len(ciphers), 0)
3965 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003966 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003967 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003968
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003969 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003970 client_context, server_context, hostname = testing_context()
3971 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003972
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003973 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003974 s = client_context.wrap_socket(socket.socket(),
3975 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003976 s.connect((HOST, server.port))
3977 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003978
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003979 self.assertRaises(ValueError, s.read, 1024)
3980 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003981
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003982 def test_sendfile(self):
3983 TEST_DATA = b"x" * 512
3984 with open(support.TESTFN, 'wb') as f:
3985 f.write(TEST_DATA)
3986 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003987 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003988 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003989 context.load_verify_locations(SIGNING_CA)
3990 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003991 server = ThreadedEchoServer(context=context, chatty=False)
3992 with server:
3993 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003994 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003995 with open(support.TESTFN, 'rb') as file:
3996 s.sendfile(file)
3997 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003998
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003999 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004000 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004001 # TODO: sessions aren't compatible with TLSv1.3 yet
4002 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004003
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004004 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004005 stats = server_params_test(client_context, server_context,
4006 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004007 session = stats['session']
4008 self.assertTrue(session.id)
4009 self.assertGreater(session.time, 0)
4010 self.assertGreater(session.timeout, 0)
4011 self.assertTrue(session.has_ticket)
4012 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4013 self.assertGreater(session.ticket_lifetime_hint, 0)
4014 self.assertFalse(stats['session_reused'])
4015 sess_stat = server_context.session_stats()
4016 self.assertEqual(sess_stat['accept'], 1)
4017 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004018
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004019 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004020 stats = server_params_test(client_context, server_context,
4021 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 sess_stat = server_context.session_stats()
4023 self.assertEqual(sess_stat['accept'], 2)
4024 self.assertEqual(sess_stat['hits'], 1)
4025 self.assertTrue(stats['session_reused'])
4026 session2 = stats['session']
4027 self.assertEqual(session2.id, session.id)
4028 self.assertEqual(session2, session)
4029 self.assertIsNot(session2, session)
4030 self.assertGreaterEqual(session2.time, session.time)
4031 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004033 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004034 stats = server_params_test(client_context, server_context,
4035 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004036 self.assertFalse(stats['session_reused'])
4037 session3 = stats['session']
4038 self.assertNotEqual(session3.id, session.id)
4039 self.assertNotEqual(session3, session)
4040 sess_stat = server_context.session_stats()
4041 self.assertEqual(sess_stat['accept'], 3)
4042 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004043
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004044 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004045 stats = server_params_test(client_context, server_context,
4046 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004047 self.assertTrue(stats['session_reused'])
4048 session4 = stats['session']
4049 self.assertEqual(session4.id, session.id)
4050 self.assertEqual(session4, session)
4051 self.assertGreaterEqual(session4.time, session.time)
4052 self.assertGreaterEqual(session4.timeout, session.timeout)
4053 sess_stat = server_context.session_stats()
4054 self.assertEqual(sess_stat['accept'], 4)
4055 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004056
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004057 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004058 client_context, server_context, hostname = testing_context()
4059 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004060
Christian Heimes05d9fe32018-02-27 08:55:39 +01004061 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004062 client_context.options |= ssl.OP_NO_TLSv1_3
4063 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004064
Christian Heimesa170fa12017-09-15 20:27:30 +02004065 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004066 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004067 with client_context.wrap_socket(socket.socket(),
4068 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004069 # session is None before handshake
4070 self.assertEqual(s.session, None)
4071 self.assertEqual(s.session_reused, None)
4072 s.connect((HOST, server.port))
4073 session = s.session
4074 self.assertTrue(session)
4075 with self.assertRaises(TypeError) as e:
4076 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004077 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004078
Christian Heimesa170fa12017-09-15 20:27:30 +02004079 with client_context.wrap_socket(socket.socket(),
4080 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004081 s.connect((HOST, server.port))
4082 # cannot set session after handshake
4083 with self.assertRaises(ValueError) as e:
4084 s.session = session
4085 self.assertEqual(str(e.exception),
4086 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004087
Christian Heimesa170fa12017-09-15 20:27:30 +02004088 with client_context.wrap_socket(socket.socket(),
4089 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004090 # can set session before handshake and before the
4091 # connection was established
4092 s.session = session
4093 s.connect((HOST, server.port))
4094 self.assertEqual(s.session.id, session.id)
4095 self.assertEqual(s.session, session)
4096 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004097
Christian Heimesa170fa12017-09-15 20:27:30 +02004098 with client_context2.wrap_socket(socket.socket(),
4099 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004100 # cannot re-use session with a different SSLContext
4101 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004102 s.session = session
4103 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004104 self.assertEqual(str(e.exception),
4105 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004106
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004107
Thomas Woutersed03b412007-08-28 21:37:11 +00004108def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004109 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004110 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004111 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004112 'Mac': platform.mac_ver,
4113 'Windows': platform.win32_ver,
4114 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004115 for name, func in plats.items():
4116 plat = func()
4117 if plat and plat[0]:
4118 plat = '%s %r' % (name, plat)
4119 break
4120 else:
4121 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004122 print("test_ssl: testing with %r %r" %
4123 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4124 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004125 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004126 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4127 try:
4128 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4129 except AttributeError:
4130 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004131
Antoine Pitrou152efa22010-05-16 18:19:27 +00004132 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004133 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004134 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004135 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004136 BADCERT, BADKEY, EMPTYCERT]:
4137 if not os.path.exists(filename):
4138 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004139
Martin Panter3840b2a2016-03-27 01:53:46 +00004140 tests = [
4141 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004142 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004143 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004144
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004145 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004146 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004147
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004148 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004149 try:
4150 support.run_unittest(*tests)
4151 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004152 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004153
4154if __name__ == "__main__":
4155 test_main()