blob: 71380797748e96049d734d859077d6a39be9131c [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
Christian Heimes05d9fe32018-02-27 08:55:39 +010058WRONG_CERT = data_file("wrongcert.pem")
Christian Heimesefff7062013-11-21 03:35:02 +010059
Christian Heimesbd5c7d22018-01-20 15:16:30 +010060CERTFILE_INFO = {
61 'issuer': ((('countryName', 'XY'),),
62 (('localityName', 'Castle Anthrax'),),
63 (('organizationName', 'Python Software Foundation'),),
64 (('commonName', 'localhost'),)),
65 'notAfter': 'Jan 17 19:09:06 2028 GMT',
66 'notBefore': 'Jan 19 19:09:06 2018 GMT',
67 'serialNumber': 'F9BA076D5B6ABD9B',
68 'subject': ((('countryName', 'XY'),),
69 (('localityName', 'Castle Anthrax'),),
70 (('organizationName', 'Python Software Foundation'),),
71 (('commonName', 'localhost'),)),
72 'subjectAltName': (('DNS', 'localhost'),),
73 'version': 3
74}
Antoine Pitrou152efa22010-05-16 18:19:27 +000075
Christian Heimes22587792013-11-21 23:56:13 +010076# empty CRL
77CRLFILE = data_file("revocation.crl")
78
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010079# Two keys and certs signed by the same CA (for SNI tests)
80SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020081SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082
83SIGNED_CERTFILE_INFO = {
84 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
85 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
86 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
87 'issuer': ((('countryName', 'XY'),),
88 (('organizationName', 'Python Software Foundation CA'),),
89 (('commonName', 'our-ca-server'),)),
90 'notAfter': 'Nov 28 19:09:06 2027 GMT',
91 'notBefore': 'Jan 19 19:09:06 2018 GMT',
92 'serialNumber': '82EDBF41C880919C',
93 'subject': ((('countryName', 'XY'),),
94 (('localityName', 'Castle Anthrax'),),
95 (('organizationName', 'Python Software Foundation'),),
96 (('commonName', 'localhost'),)),
97 'subjectAltName': (('DNS', 'localhost'),),
98 'version': 3
99}
100
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100101SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200102SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100103SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
104SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
105
Martin Panter3840b2a2016-03-27 01:53:46 +0000106# Same certificate as pycacert.pem, but without extra text in file
107SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200108# cert with all kinds of subject alt names
109ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100110IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100111
Martin Panter3d81d932016-01-14 09:36:00 +0000112REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000113
114EMPTYCERT = data_file("nullcert.pem")
115BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000116NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000117BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200118NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200119NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400121DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Christian Heimesb7b92252018-02-25 09:49:31 +0100149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100184needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
185
Antoine Pitrou23df4832010-08-04 17:14:06 +0000186
Christian Heimesd0486372016-09-10 23:23:33 +0200187def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
188 cert_reqs=ssl.CERT_NONE, ca_certs=None,
189 ciphers=None, certfile=None, keyfile=None,
190 **kwargs):
191 context = ssl.SSLContext(ssl_version)
192 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200193 if cert_reqs == ssl.CERT_NONE:
194 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200195 context.verify_mode = cert_reqs
196 if ca_certs is not None:
197 context.load_verify_locations(ca_certs)
198 if certfile is not None or keyfile is not None:
199 context.load_cert_chain(certfile, keyfile)
200 if ciphers is not None:
201 context.set_ciphers(ciphers)
202 return context.wrap_socket(sock, **kwargs)
203
Christian Heimesa170fa12017-09-15 20:27:30 +0200204
205def testing_context(server_cert=SIGNED_CERTFILE):
206 """Create context
207
208 client_context, server_context, hostname = testing_context()
209 """
210 if server_cert == SIGNED_CERTFILE:
211 hostname = SIGNED_CERTFILE_HOSTNAME
212 elif server_cert == SIGNED_CERTFILE2:
213 hostname = SIGNED_CERTFILE2_HOSTNAME
214 else:
215 raise ValueError(server_cert)
216
217 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
218 client_context.load_verify_locations(SIGNING_CA)
219
220 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
221 server_context.load_cert_chain(server_cert)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100222 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200223
224 return client_context, server_context, hostname
225
226
Antoine Pitrou152efa22010-05-16 18:19:27 +0000227class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000228
Antoine Pitrou480a1242010-04-28 21:37:09 +0000229 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000230 ssl.CERT_NONE
231 ssl.CERT_OPTIONAL
232 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100233 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100234 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100235 if ssl.HAS_ECDH:
236 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100237 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
238 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000239 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100240 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700241 ssl.OP_NO_SSLv2
242 ssl.OP_NO_SSLv3
243 ssl.OP_NO_TLSv1
244 ssl.OP_NO_TLSv1_3
245 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
246 ssl.OP_NO_TLSv1_1
247 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200248 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000249
Christian Heimes9d50ab52018-02-27 10:17:30 +0100250 def test_private_init(self):
251 with self.assertRaisesRegex(TypeError, "public constructor"):
252 with socket.socket() as s:
253 ssl.SSLSocket(s)
254
Antoine Pitrou172f0252014-04-18 20:33:08 +0200255 def test_str_for_enums(self):
256 # Make sure that the PROTOCOL_* constants have enum-like string
257 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200258 proto = ssl.PROTOCOL_TLS
259 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200260 ctx = ssl.SSLContext(proto)
261 self.assertIs(ctx.protocol, proto)
262
Antoine Pitrou480a1242010-04-28 21:37:09 +0000263 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000264 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000265 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000266 sys.stdout.write("\n RAND_status is %d (%s)\n"
267 % (v, (v and "sufficient randomness") or
268 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200269
270 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
271 self.assertEqual(len(data), 16)
272 self.assertEqual(is_cryptographic, v == 1)
273 if v:
274 data = ssl.RAND_bytes(16)
275 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200276 else:
277 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200278
Victor Stinner1e81a392013-12-19 16:47:04 +0100279 # negative num is invalid
280 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
281 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
282
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100283 if hasattr(ssl, 'RAND_egd'):
284 self.assertRaises(TypeError, ssl.RAND_egd, 1)
285 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000286 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200287 ssl.RAND_add(b"this is a random bytes object", 75.0)
288 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000289
Christian Heimesf77b4b22013-08-21 13:26:05 +0200290 @unittest.skipUnless(os.name == 'posix', 'requires posix')
291 def test_random_fork(self):
292 status = ssl.RAND_status()
293 if not status:
294 self.fail("OpenSSL's PRNG has insufficient randomness")
295
296 rfd, wfd = os.pipe()
297 pid = os.fork()
298 if pid == 0:
299 try:
300 os.close(rfd)
301 child_random = ssl.RAND_pseudo_bytes(16)[0]
302 self.assertEqual(len(child_random), 16)
303 os.write(wfd, child_random)
304 os.close(wfd)
305 except BaseException:
306 os._exit(1)
307 else:
308 os._exit(0)
309 else:
310 os.close(wfd)
311 self.addCleanup(os.close, rfd)
312 _, status = os.waitpid(pid, 0)
313 self.assertEqual(status, 0)
314
315 child_random = os.read(rfd, 16)
316 self.assertEqual(len(child_random), 16)
317 parent_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(parent_random), 16)
319
320 self.assertNotEqual(child_random, parent_random)
321
Antoine Pitrou480a1242010-04-28 21:37:09 +0000322 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000323 # note that this uses an 'unofficial' function in _ssl.c,
324 # provided solely for this test, to exercise the certificate
325 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100326 self.assertEqual(
327 ssl._ssl._test_decode_cert(CERTFILE),
328 CERTFILE_INFO
329 )
330 self.assertEqual(
331 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
332 SIGNED_CERTFILE_INFO
333 )
334
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200335 # Issue #13034: the subjectAltName in some certificates
336 # (notably projects.developer.nokia.com:443) wasn't parsed
337 p = ssl._ssl._test_decode_cert(NOKIACERT)
338 if support.verbose:
339 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
340 self.assertEqual(p['subjectAltName'],
341 (('DNS', 'projects.developer.nokia.com'),
342 ('DNS', 'projects.forum.nokia.com'))
343 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100344 # extra OCSP and AIA fields
345 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
346 self.assertEqual(p['caIssuers'],
347 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
348 self.assertEqual(p['crlDistributionPoints'],
349 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000350
Christian Heimes824f7f32013-08-17 00:54:47 +0200351 def test_parse_cert_CVE_2013_4238(self):
352 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
353 if support.verbose:
354 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
355 subject = ((('countryName', 'US'),),
356 (('stateOrProvinceName', 'Oregon'),),
357 (('localityName', 'Beaverton'),),
358 (('organizationName', 'Python Software Foundation'),),
359 (('organizationalUnitName', 'Python Core Development'),),
360 (('commonName', 'null.python.org\x00example.org'),),
361 (('emailAddress', 'python-dev@python.org'),))
362 self.assertEqual(p['subject'], subject)
363 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200364 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
365 san = (('DNS', 'altnull.python.org\x00example.com'),
366 ('email', 'null@python.org\x00user@example.org'),
367 ('URI', 'http://null.python.org\x00http://example.org'),
368 ('IP Address', '192.0.2.1'),
369 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
370 else:
371 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
372 san = (('DNS', 'altnull.python.org\x00example.com'),
373 ('email', 'null@python.org\x00user@example.org'),
374 ('URI', 'http://null.python.org\x00http://example.org'),
375 ('IP Address', '192.0.2.1'),
376 ('IP Address', '<invalid>'))
377
378 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200379
Christian Heimes1c03abd2016-09-06 23:25:35 +0200380 def test_parse_all_sans(self):
381 p = ssl._ssl._test_decode_cert(ALLSANFILE)
382 self.assertEqual(p['subjectAltName'],
383 (
384 ('DNS', 'allsans'),
385 ('othername', '<unsupported>'),
386 ('othername', '<unsupported>'),
387 ('email', 'user@example.org'),
388 ('DNS', 'www.example.org'),
389 ('DirName',
390 ((('countryName', 'XY'),),
391 (('localityName', 'Castle Anthrax'),),
392 (('organizationName', 'Python Software Foundation'),),
393 (('commonName', 'dirname example'),))),
394 ('URI', 'https://www.python.org/'),
395 ('IP Address', '127.0.0.1'),
396 ('IP Address', '0:0:0:0:0:0:0:1\n'),
397 ('Registered ID', '1.2.3.4.5')
398 )
399 )
400
Antoine Pitrou480a1242010-04-28 21:37:09 +0000401 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000402 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000403 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000404 d1 = ssl.PEM_cert_to_DER_cert(pem)
405 p2 = ssl.DER_cert_to_PEM_cert(d1)
406 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000407 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000408 if not p2.startswith(ssl.PEM_HEADER + '\n'):
409 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
410 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
411 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000412
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000413 def test_openssl_version(self):
414 n = ssl.OPENSSL_VERSION_NUMBER
415 t = ssl.OPENSSL_VERSION_INFO
416 s = ssl.OPENSSL_VERSION
417 self.assertIsInstance(n, int)
418 self.assertIsInstance(t, tuple)
419 self.assertIsInstance(s, str)
420 # Some sanity checks follow
421 # >= 0.9
422 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400423 # < 3.0
424 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000425 major, minor, fix, patch, status = t
426 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400427 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000428 self.assertGreaterEqual(minor, 0)
429 self.assertLess(minor, 256)
430 self.assertGreaterEqual(fix, 0)
431 self.assertLess(fix, 256)
432 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100433 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000434 self.assertGreaterEqual(status, 0)
435 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400436 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200437 if IS_LIBRESSL:
438 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100439 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400440 else:
441 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100442 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000443
Antoine Pitrou9d543662010-04-23 23:10:32 +0000444 @support.cpython_only
445 def test_refcycle(self):
446 # Issue #7943: an SSL object doesn't create reference cycles with
447 # itself.
448 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200449 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000450 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100451 with support.check_warnings(("", ResourceWarning)):
452 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100453 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000454
Antoine Pitroua468adc2010-09-14 14:43:44 +0000455 def test_wrapped_unconnected(self):
456 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200457 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000458 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200459 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100460 self.assertRaises(OSError, ss.recv, 1)
461 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
462 self.assertRaises(OSError, ss.recvfrom, 1)
463 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
464 self.assertRaises(OSError, ss.send, b'x')
465 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Christian Heimes141c5e82018-02-24 21:10:57 +0100466 self.assertRaises(NotImplementedError, ss.sendmsg,
467 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000468
Antoine Pitrou40f08742010-04-24 22:04:40 +0000469 def test_timeout(self):
470 # Issue #8524: when creating an SSL socket, the timeout of the
471 # original socket should be retained.
472 for timeout in (None, 0.0, 5.0):
473 s = socket.socket(socket.AF_INET)
474 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200475 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100476 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000477
Christian Heimesd0486372016-09-10 23:23:33 +0200478 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000479 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000480 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000481 "certfile must be specified",
482 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000483 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000484 "certfile must be specified for server-side operations",
485 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000486 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000487 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200488 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100489 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
490 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200491 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200492 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000493 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000494 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000495 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200496 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000497 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000498 ssl.wrap_socket(sock,
499 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000500 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200501 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000502 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000503 ssl.wrap_socket(sock,
504 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000505 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000506
Martin Panter3464ea22016-02-01 21:58:11 +0000507 def bad_cert_test(self, certfile):
508 """Check that trying to use the given client certificate fails"""
509 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
510 certfile)
511 sock = socket.socket()
512 self.addCleanup(sock.close)
513 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200514 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200515 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000516
517 def test_empty_cert(self):
518 """Wrapping with an empty cert file"""
519 self.bad_cert_test("nullcert.pem")
520
521 def test_malformed_cert(self):
522 """Wrapping with a badly formatted certificate (syntax error)"""
523 self.bad_cert_test("badcert.pem")
524
525 def test_malformed_key(self):
526 """Wrapping with a badly formatted key (syntax error)"""
527 self.bad_cert_test("badkey.pem")
528
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000529 def test_match_hostname(self):
530 def ok(cert, hostname):
531 ssl.match_hostname(cert, hostname)
532 def fail(cert, hostname):
533 self.assertRaises(ssl.CertificateError,
534 ssl.match_hostname, cert, hostname)
535
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100536 # -- Hostname matching --
537
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000538 cert = {'subject': ((('commonName', 'example.com'),),)}
539 ok(cert, 'example.com')
540 ok(cert, 'ExAmple.cOm')
541 fail(cert, 'www.example.com')
542 fail(cert, '.example.com')
543 fail(cert, 'example.org')
544 fail(cert, 'exampleXcom')
545
546 cert = {'subject': ((('commonName', '*.a.com'),),)}
547 ok(cert, 'foo.a.com')
548 fail(cert, 'bar.foo.a.com')
549 fail(cert, 'a.com')
550 fail(cert, 'Xa.com')
551 fail(cert, '.a.com')
552
Mandeep Singhede2ac92017-11-27 04:01:27 +0530553 # only match wildcards when they are the only thing
554 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530556 fail(cert, 'foo.com')
557 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000558 fail(cert, 'bar.com')
559 fail(cert, 'foo.a.com')
560 fail(cert, 'bar.foo.com')
561
Christian Heimes824f7f32013-08-17 00:54:47 +0200562 # NULL bytes are bad, CVE-2013-4073
563 cert = {'subject': ((('commonName',
564 'null.python.org\x00example.org'),),)}
565 ok(cert, 'null.python.org\x00example.org') # or raise an error?
566 fail(cert, 'example.org')
567 fail(cert, 'null.python.org')
568
Georg Brandl72c98d32013-10-27 07:16:53 +0100569 # error cases with wildcards
570 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
571 fail(cert, 'bar.foo.a.com')
572 fail(cert, 'a.com')
573 fail(cert, 'Xa.com')
574 fail(cert, '.a.com')
575
576 cert = {'subject': ((('commonName', 'a.*.com'),),)}
577 fail(cert, 'a.foo.com')
578 fail(cert, 'a..com')
579 fail(cert, 'a.com')
580
581 # wildcard doesn't match IDNA prefix 'xn--'
582 idna = 'püthon.python.org'.encode("idna").decode("ascii")
583 cert = {'subject': ((('commonName', idna),),)}
584 ok(cert, idna)
585 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
586 fail(cert, idna)
587 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
588 fail(cert, idna)
589
590 # wildcard in first fragment and IDNA A-labels in sequent fragments
591 # are supported.
592 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
593 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530594 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
595 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100596 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
597 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
598
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 # Slightly fake real-world example
600 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
601 'subject': ((('commonName', 'linuxfrz.org'),),),
602 'subjectAltName': (('DNS', 'linuxfr.org'),
603 ('DNS', 'linuxfr.com'),
604 ('othername', '<unsupported>'))}
605 ok(cert, 'linuxfr.org')
606 ok(cert, 'linuxfr.com')
607 # Not a "DNS" entry
608 fail(cert, '<unsupported>')
609 # When there is a subjectAltName, commonName isn't used
610 fail(cert, 'linuxfrz.org')
611
612 # A pristine real-world example
613 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
614 'subject': ((('countryName', 'US'),),
615 (('stateOrProvinceName', 'California'),),
616 (('localityName', 'Mountain View'),),
617 (('organizationName', 'Google Inc'),),
618 (('commonName', 'mail.google.com'),))}
619 ok(cert, 'mail.google.com')
620 fail(cert, 'gmail.com')
621 # Only commonName is considered
622 fail(cert, 'California')
623
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100624 # -- IPv4 matching --
625 cert = {'subject': ((('commonName', 'example.com'),),),
626 'subjectAltName': (('DNS', 'example.com'),
627 ('IP Address', '10.11.12.13'),
628 ('IP Address', '14.15.16.17'))}
629 ok(cert, '10.11.12.13')
630 ok(cert, '14.15.16.17')
631 fail(cert, '14.15.16.18')
632 fail(cert, 'example.net')
633
634 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100635 if hasattr(socket, 'AF_INET6'):
636 cert = {'subject': ((('commonName', 'example.com'),),),
637 'subjectAltName': (
638 ('DNS', 'example.com'),
639 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
640 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
641 ok(cert, '2001::cafe')
642 ok(cert, '2003::baba')
643 fail(cert, '2003::bebe')
644 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100645
646 # -- Miscellaneous --
647
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000648 # Neither commonName nor subjectAltName
649 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
650 'subject': ((('countryName', 'US'),),
651 (('stateOrProvinceName', 'California'),),
652 (('localityName', 'Mountain View'),),
653 (('organizationName', 'Google Inc'),))}
654 fail(cert, 'mail.google.com')
655
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200656 # No DNS entry in subjectAltName but a commonName
657 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
658 'subject': ((('countryName', 'US'),),
659 (('stateOrProvinceName', 'California'),),
660 (('localityName', 'Mountain View'),),
661 (('commonName', 'mail.google.com'),)),
662 'subjectAltName': (('othername', 'blabla'), )}
663 ok(cert, 'mail.google.com')
664
665 # No DNS entry subjectAltName and no commonName
666 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
667 'subject': ((('countryName', 'US'),),
668 (('stateOrProvinceName', 'California'),),
669 (('localityName', 'Mountain View'),),
670 (('organizationName', 'Google Inc'),)),
671 'subjectAltName': (('othername', 'blabla'),)}
672 fail(cert, 'google.com')
673
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000674 # Empty cert / no cert
675 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
676 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
677
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200678 # Issue #17980: avoid denials of service by refusing more than one
679 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100680 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
681 with self.assertRaisesRegex(
682 ssl.CertificateError,
683 "partial wildcards in leftmost label are not supported"):
684 ssl.match_hostname(cert, 'axxb.example.com')
685
686 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
687 with self.assertRaisesRegex(
688 ssl.CertificateError,
689 "wildcard can only be present in the leftmost label"):
690 ssl.match_hostname(cert, 'www.sub.example.com')
691
692 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
693 with self.assertRaisesRegex(
694 ssl.CertificateError,
695 "too many wildcards"):
696 ssl.match_hostname(cert, 'axxbxxc.example.com')
697
698 cert = {'subject': ((('commonName', '*'),),)}
699 with self.assertRaisesRegex(
700 ssl.CertificateError,
701 "sole wildcard without additional labels are not support"):
702 ssl.match_hostname(cert, 'host')
703
704 cert = {'subject': ((('commonName', '*.com'),),)}
705 with self.assertRaisesRegex(
706 ssl.CertificateError,
707 r"hostname 'com' doesn't match '\*.com'"):
708 ssl.match_hostname(cert, 'com')
709
710 # extra checks for _inet_paton()
711 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
712 with self.assertRaises(ValueError):
713 ssl._inet_paton(invalid)
714 for ipaddr in ['127.0.0.1', '192.168.0.1']:
715 self.assertTrue(ssl._inet_paton(ipaddr))
716 if hasattr(socket, 'AF_INET6'):
717 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
718 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200719
Antoine Pitroud5323212010-10-22 18:19:07 +0000720 def test_server_side(self):
721 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200722 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000723 with socket.socket() as sock:
724 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
725 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000726
Antoine Pitroud6494802011-07-21 01:11:30 +0200727 def test_unknown_channel_binding(self):
728 # should raise ValueError for unknown type
729 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200730 s.bind(('127.0.0.1', 0))
731 s.listen()
732 c = socket.socket(socket.AF_INET)
733 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200734 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100735 with self.assertRaises(ValueError):
736 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200737 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200738
739 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
740 "'tls-unique' channel binding not available")
741 def test_tls_unique_channel_binding(self):
742 # unconnected should return None for known type
743 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200744 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100745 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200746 # the same for server-side
747 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200748 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100749 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200750
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600751 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200752 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600753 r = repr(ss)
754 with self.assertWarns(ResourceWarning) as cm:
755 ss = None
756 support.gc_collect()
757 self.assertIn(r, str(cm.warning.args[0]))
758
Christian Heimes6d7ad132013-06-09 18:02:55 +0200759 def test_get_default_verify_paths(self):
760 paths = ssl.get_default_verify_paths()
761 self.assertEqual(len(paths), 6)
762 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
763
764 with support.EnvironmentVarGuard() as env:
765 env["SSL_CERT_DIR"] = CAPATH
766 env["SSL_CERT_FILE"] = CERTFILE
767 paths = ssl.get_default_verify_paths()
768 self.assertEqual(paths.cafile, CERTFILE)
769 self.assertEqual(paths.capath, CAPATH)
770
Christian Heimes44109d72013-11-22 01:51:30 +0100771 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
772 def test_enum_certificates(self):
773 self.assertTrue(ssl.enum_certificates("CA"))
774 self.assertTrue(ssl.enum_certificates("ROOT"))
775
776 self.assertRaises(TypeError, ssl.enum_certificates)
777 self.assertRaises(WindowsError, ssl.enum_certificates, "")
778
Christian Heimesc2d65e12013-11-22 16:13:55 +0100779 trust_oids = set()
780 for storename in ("CA", "ROOT"):
781 store = ssl.enum_certificates(storename)
782 self.assertIsInstance(store, list)
783 for element in store:
784 self.assertIsInstance(element, tuple)
785 self.assertEqual(len(element), 3)
786 cert, enc, trust = element
787 self.assertIsInstance(cert, bytes)
788 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
789 self.assertIsInstance(trust, (set, bool))
790 if isinstance(trust, set):
791 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100792
793 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100794 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200795
Christian Heimes46bebee2013-06-09 19:03:31 +0200796 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100797 def test_enum_crls(self):
798 self.assertTrue(ssl.enum_crls("CA"))
799 self.assertRaises(TypeError, ssl.enum_crls)
800 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200801
Christian Heimes44109d72013-11-22 01:51:30 +0100802 crls = ssl.enum_crls("CA")
803 self.assertIsInstance(crls, list)
804 for element in crls:
805 self.assertIsInstance(element, tuple)
806 self.assertEqual(len(element), 2)
807 self.assertIsInstance(element[0], bytes)
808 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200809
Christian Heimes46bebee2013-06-09 19:03:31 +0200810
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100811 def test_asn1object(self):
812 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
813 '1.3.6.1.5.5.7.3.1')
814
815 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
816 self.assertEqual(val, expected)
817 self.assertEqual(val.nid, 129)
818 self.assertEqual(val.shortname, 'serverAuth')
819 self.assertEqual(val.longname, 'TLS Web Server Authentication')
820 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
821 self.assertIsInstance(val, ssl._ASN1Object)
822 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
823
824 val = ssl._ASN1Object.fromnid(129)
825 self.assertEqual(val, expected)
826 self.assertIsInstance(val, ssl._ASN1Object)
827 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100828 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
829 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100830 for i in range(1000):
831 try:
832 obj = ssl._ASN1Object.fromnid(i)
833 except ValueError:
834 pass
835 else:
836 self.assertIsInstance(obj.nid, int)
837 self.assertIsInstance(obj.shortname, str)
838 self.assertIsInstance(obj.longname, str)
839 self.assertIsInstance(obj.oid, (str, type(None)))
840
841 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
842 self.assertEqual(val, expected)
843 self.assertIsInstance(val, ssl._ASN1Object)
844 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
845 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
846 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100847 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
848 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100849
Christian Heimes72d28502013-11-23 13:56:58 +0100850 def test_purpose_enum(self):
851 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
852 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
853 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
854 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
855 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
856 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
857 '1.3.6.1.5.5.7.3.1')
858
859 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
860 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
861 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
862 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
863 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
864 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
865 '1.3.6.1.5.5.7.3.2')
866
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100867 def test_unsupported_dtls(self):
868 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
869 self.addCleanup(s.close)
870 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200871 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100872 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200873 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100874 with self.assertRaises(NotImplementedError) as cx:
875 ctx.wrap_socket(s)
876 self.assertEqual(str(cx.exception), "only stream sockets are supported")
877
Antoine Pitrouc695c952014-04-28 20:57:36 +0200878 def cert_time_ok(self, timestring, timestamp):
879 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
880
881 def cert_time_fail(self, timestring):
882 with self.assertRaises(ValueError):
883 ssl.cert_time_to_seconds(timestring)
884
885 @unittest.skipUnless(utc_offset(),
886 'local time needs to be different from UTC')
887 def test_cert_time_to_seconds_timezone(self):
888 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
889 # results if local timezone is not UTC
890 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
891 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
892
893 def test_cert_time_to_seconds(self):
894 timestring = "Jan 5 09:34:43 2018 GMT"
895 ts = 1515144883.0
896 self.cert_time_ok(timestring, ts)
897 # accept keyword parameter, assert its name
898 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
899 # accept both %e and %d (space or zero generated by strftime)
900 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
901 # case-insensitive
902 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
903 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
904 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
905 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
906 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
907 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
908 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
909 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
910
911 newyear_ts = 1230768000.0
912 # leap seconds
913 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
914 # same timestamp
915 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
916
917 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
918 # allow 60th second (even if it is not a leap second)
919 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
920 # allow 2nd leap second for compatibility with time.strptime()
921 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
922 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
923
Mike53f7a7c2017-12-14 14:04:53 +0300924 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200925 # 99991231235959Z (rfc 5280)
926 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
927
928 @support.run_with_locale('LC_ALL', '')
929 def test_cert_time_to_seconds_locale(self):
930 # `cert_time_to_seconds()` should be locale independent
931
932 def local_february_name():
933 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
934
935 if local_february_name().lower() == 'feb':
936 self.skipTest("locale-specific month name needs to be "
937 "different from C locale")
938
939 # locale-independent
940 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
941 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
942
Martin Panter3840b2a2016-03-27 01:53:46 +0000943 def test_connect_ex_error(self):
944 server = socket.socket(socket.AF_INET)
945 self.addCleanup(server.close)
946 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200947 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000948 cert_reqs=ssl.CERT_REQUIRED)
949 self.addCleanup(s.close)
950 rc = s.connect_ex((HOST, port))
951 # Issue #19919: Windows machines or VMs hosted on Windows
952 # machines sometimes return EWOULDBLOCK.
953 errors = (
954 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
955 errno.EWOULDBLOCK,
956 )
957 self.assertIn(rc, errors)
958
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100959
Antoine Pitrou152efa22010-05-16 18:19:27 +0000960class ContextTests(unittest.TestCase):
961
962 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100963 for protocol in PROTOCOLS:
964 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200965 ctx = ssl.SSLContext()
966 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000967 self.assertRaises(ValueError, ssl.SSLContext, -1)
968 self.assertRaises(ValueError, ssl.SSLContext, 42)
969
970 def test_protocol(self):
971 for proto in PROTOCOLS:
972 ctx = ssl.SSLContext(proto)
973 self.assertEqual(ctx.protocol, proto)
974
975 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200976 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000977 ctx.set_ciphers("ALL")
978 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000979 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000980 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000981
Christian Heimes892d66e2018-01-29 14:10:18 +0100982 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
983 "Test applies only to Python default ciphers")
984 def test_python_ciphers(self):
985 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
986 ciphers = ctx.get_ciphers()
987 for suite in ciphers:
988 name = suite['name']
989 self.assertNotIn("PSK", name)
990 self.assertNotIn("SRP", name)
991 self.assertNotIn("MD5", name)
992 self.assertNotIn("RC4", name)
993 self.assertNotIn("3DES", name)
994
Christian Heimes25bfcd52016-09-06 00:04:45 +0200995 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
996 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200997 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200998 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200999 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001000 self.assertIn('AES256-GCM-SHA384', names)
1001 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001002
Antoine Pitroub5218772010-05-21 09:56:06 +00001003 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001005 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001006 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001007 # SSLContext also enables these by default
1008 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001009 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1010 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001011 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001012 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001013 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001014 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001015 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1016 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001017 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001018 # Ubuntu has OP_NO_SSLv3 forced on by default
1019 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001020 else:
1021 with self.assertRaises(ValueError):
1022 ctx.options = 0
1023
Christian Heimesa170fa12017-09-15 20:27:30 +02001024 def test_verify_mode_protocol(self):
1025 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001026 # Default value
1027 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1028 ctx.verify_mode = ssl.CERT_OPTIONAL
1029 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1030 ctx.verify_mode = ssl.CERT_REQUIRED
1031 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1032 ctx.verify_mode = ssl.CERT_NONE
1033 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1034 with self.assertRaises(TypeError):
1035 ctx.verify_mode = None
1036 with self.assertRaises(ValueError):
1037 ctx.verify_mode = 42
1038
Christian Heimesa170fa12017-09-15 20:27:30 +02001039 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1040 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1041 self.assertFalse(ctx.check_hostname)
1042
1043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1044 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1045 self.assertTrue(ctx.check_hostname)
1046
Christian Heimes61d478c2018-01-27 15:51:38 +01001047 def test_hostname_checks_common_name(self):
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1049 self.assertTrue(ctx.hostname_checks_common_name)
1050 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1051 ctx.hostname_checks_common_name = True
1052 self.assertTrue(ctx.hostname_checks_common_name)
1053 ctx.hostname_checks_common_name = False
1054 self.assertFalse(ctx.hostname_checks_common_name)
1055 ctx.hostname_checks_common_name = True
1056 self.assertTrue(ctx.hostname_checks_common_name)
1057 else:
1058 with self.assertRaises(AttributeError):
1059 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001060
Christian Heimes698dde12018-02-27 11:54:43 +01001061 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1062 "required OpenSSL 1.1.0g")
1063 def test_min_max_version(self):
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1065 self.assertEqual(
1066 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1067 )
1068 self.assertEqual(
1069 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1070 )
1071
1072 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1073 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1074 self.assertEqual(
1075 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1076 )
1077 self.assertEqual(
1078 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1079 )
1080
1081 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1082 ctx.maximum_version = ssl.TLSVersion.TLSv1
1083 self.assertEqual(
1084 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1085 )
1086 self.assertEqual(
1087 ctx.maximum_version, ssl.TLSVersion.TLSv1
1088 )
1089
1090 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1091 self.assertEqual(
1092 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1093 )
1094
1095 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1096 self.assertIn(
1097 ctx.maximum_version,
1098 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1099 )
1100
1101 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1102 self.assertIn(
1103 ctx.minimum_version,
1104 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1105 )
1106
1107 with self.assertRaises(ValueError):
1108 ctx.minimum_version = 42
1109
1110 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1111
1112 self.assertEqual(
1113 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1114 )
1115 self.assertEqual(
1116 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1117 )
1118 with self.assertRaises(ValueError):
1119 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1120 with self.assertRaises(ValueError):
1121 ctx.maximum_version = ssl.TLSVersion.TLSv1
1122
1123
Christian Heimes2427b502013-11-23 11:24:32 +01001124 @unittest.skipUnless(have_verify_flags(),
1125 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001126 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001127 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001128 # default value
1129 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1130 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001131 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1132 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1133 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1134 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1135 ctx.verify_flags = ssl.VERIFY_DEFAULT
1136 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1137 # supports any value
1138 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1139 self.assertEqual(ctx.verify_flags,
1140 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1141 with self.assertRaises(TypeError):
1142 ctx.verify_flags = None
1143
Antoine Pitrou152efa22010-05-16 18:19:27 +00001144 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001145 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001146 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001147 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001148 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1149 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001150 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001151 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001152 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001153 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001154 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001155 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001156 ctx.load_cert_chain(EMPTYCERT)
1157 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001158 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001159 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1160 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1161 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001162 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001163 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001164 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001165 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001166 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001167 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1168 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001170 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001171 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001172 # Password protected key and cert
1173 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1174 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1175 ctx.load_cert_chain(CERTFILE_PROTECTED,
1176 password=bytearray(KEY_PASSWORD.encode()))
1177 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1178 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1179 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1180 bytearray(KEY_PASSWORD.encode()))
1181 with self.assertRaisesRegex(TypeError, "should be a string"):
1182 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1183 with self.assertRaises(ssl.SSLError):
1184 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1185 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1186 # openssl has a fixed limit on the password buffer.
1187 # PEM_BUFSIZE is generally set to 1kb.
1188 # Return a string larger than this.
1189 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1190 # Password callback
1191 def getpass_unicode():
1192 return KEY_PASSWORD
1193 def getpass_bytes():
1194 return KEY_PASSWORD.encode()
1195 def getpass_bytearray():
1196 return bytearray(KEY_PASSWORD.encode())
1197 def getpass_badpass():
1198 return "badpass"
1199 def getpass_huge():
1200 return b'a' * (1024 * 1024)
1201 def getpass_bad_type():
1202 return 9
1203 def getpass_exception():
1204 raise Exception('getpass error')
1205 class GetPassCallable:
1206 def __call__(self):
1207 return KEY_PASSWORD
1208 def getpass(self):
1209 return KEY_PASSWORD
1210 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1211 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1212 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1213 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1214 ctx.load_cert_chain(CERTFILE_PROTECTED,
1215 password=GetPassCallable().getpass)
1216 with self.assertRaises(ssl.SSLError):
1217 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1218 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1219 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1220 with self.assertRaisesRegex(TypeError, "must return a string"):
1221 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1222 with self.assertRaisesRegex(Exception, "getpass error"):
1223 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1224 # Make sure the password function isn't called if it isn't needed
1225 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001226
1227 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001228 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001229 ctx.load_verify_locations(CERTFILE)
1230 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1231 ctx.load_verify_locations(BYTES_CERTFILE)
1232 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1233 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001234 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001235 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001236 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001237 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001238 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001239 ctx.load_verify_locations(BADCERT)
1240 ctx.load_verify_locations(CERTFILE, CAPATH)
1241 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1242
Victor Stinner80f75e62011-01-29 11:31:20 +00001243 # Issue #10989: crash if the second argument type is invalid
1244 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1245
Christian Heimesefff7062013-11-21 03:35:02 +01001246 def test_load_verify_cadata(self):
1247 # test cadata
1248 with open(CAFILE_CACERT) as f:
1249 cacert_pem = f.read()
1250 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1251 with open(CAFILE_NEURONIO) as f:
1252 neuronio_pem = f.read()
1253 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1254
1255 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001256 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001257 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1258 ctx.load_verify_locations(cadata=cacert_pem)
1259 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1260 ctx.load_verify_locations(cadata=neuronio_pem)
1261 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1262 # cert already in hash table
1263 ctx.load_verify_locations(cadata=neuronio_pem)
1264 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1265
1266 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001267 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001268 combined = "\n".join((cacert_pem, neuronio_pem))
1269 ctx.load_verify_locations(cadata=combined)
1270 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1271
1272 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001273 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001274 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1275 neuronio_pem, "tail"]
1276 ctx.load_verify_locations(cadata="\n".join(combined))
1277 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1278
1279 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001280 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001281 ctx.load_verify_locations(cadata=cacert_der)
1282 ctx.load_verify_locations(cadata=neuronio_der)
1283 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1284 # cert already in hash table
1285 ctx.load_verify_locations(cadata=cacert_der)
1286 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1287
1288 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001289 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001290 combined = b"".join((cacert_der, neuronio_der))
1291 ctx.load_verify_locations(cadata=combined)
1292 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1293
1294 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001295 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001296 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1297
1298 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1299 ctx.load_verify_locations(cadata="broken")
1300 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1301 ctx.load_verify_locations(cadata=b"broken")
1302
1303
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001304 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001306 ctx.load_dh_params(DHFILE)
1307 if os.name != 'nt':
1308 ctx.load_dh_params(BYTES_DHFILE)
1309 self.assertRaises(TypeError, ctx.load_dh_params)
1310 self.assertRaises(TypeError, ctx.load_dh_params, None)
1311 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001312 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001313 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001314 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001315 ctx.load_dh_params(CERTFILE)
1316
Antoine Pitroub0182c82010-10-12 20:09:02 +00001317 def test_session_stats(self):
1318 for proto in PROTOCOLS:
1319 ctx = ssl.SSLContext(proto)
1320 self.assertEqual(ctx.session_stats(), {
1321 'number': 0,
1322 'connect': 0,
1323 'connect_good': 0,
1324 'connect_renegotiate': 0,
1325 'accept': 0,
1326 'accept_good': 0,
1327 'accept_renegotiate': 0,
1328 'hits': 0,
1329 'misses': 0,
1330 'timeouts': 0,
1331 'cache_full': 0,
1332 })
1333
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001334 def test_set_default_verify_paths(self):
1335 # There's not much we can do to test that it acts as expected,
1336 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001337 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001338 ctx.set_default_verify_paths()
1339
Antoine Pitrou501da612011-12-21 09:27:41 +01001340 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001341 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001342 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001343 ctx.set_ecdh_curve("prime256v1")
1344 ctx.set_ecdh_curve(b"prime256v1")
1345 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1346 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1347 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1348 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1349
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001350 @needs_sni
1351 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001352 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001353
1354 # set_servername_callback expects a callable, or None
1355 self.assertRaises(TypeError, ctx.set_servername_callback)
1356 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1357 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1358 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1359
1360 def dummycallback(sock, servername, ctx):
1361 pass
1362 ctx.set_servername_callback(None)
1363 ctx.set_servername_callback(dummycallback)
1364
1365 @needs_sni
1366 def test_sni_callback_refcycle(self):
1367 # Reference cycles through the servername callback are detected
1368 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001369 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001370 def dummycallback(sock, servername, ctx, cycle=ctx):
1371 pass
1372 ctx.set_servername_callback(dummycallback)
1373 wr = weakref.ref(ctx)
1374 del ctx, dummycallback
1375 gc.collect()
1376 self.assertIs(wr(), None)
1377
Christian Heimes9a5395a2013-06-17 15:44:12 +02001378 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001379 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001380 self.assertEqual(ctx.cert_store_stats(),
1381 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1382 ctx.load_cert_chain(CERTFILE)
1383 self.assertEqual(ctx.cert_store_stats(),
1384 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1385 ctx.load_verify_locations(CERTFILE)
1386 self.assertEqual(ctx.cert_store_stats(),
1387 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001388 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001389 self.assertEqual(ctx.cert_store_stats(),
1390 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1391
1392 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001393 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001394 self.assertEqual(ctx.get_ca_certs(), [])
1395 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1396 ctx.load_verify_locations(CERTFILE)
1397 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001398 # but CAFILE_CACERT is a CA cert
1399 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001400 self.assertEqual(ctx.get_ca_certs(),
1401 [{'issuer': ((('organizationName', 'Root CA'),),
1402 (('organizationalUnitName', 'http://www.cacert.org'),),
1403 (('commonName', 'CA Cert Signing Authority'),),
1404 (('emailAddress', 'support@cacert.org'),)),
1405 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1406 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1407 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001408 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001409 'subject': ((('organizationName', 'Root CA'),),
1410 (('organizationalUnitName', 'http://www.cacert.org'),),
1411 (('commonName', 'CA Cert Signing Authority'),),
1412 (('emailAddress', 'support@cacert.org'),)),
1413 'version': 3}])
1414
Martin Panterb55f8b72016-01-14 12:53:56 +00001415 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001416 pem = f.read()
1417 der = ssl.PEM_cert_to_DER_cert(pem)
1418 self.assertEqual(ctx.get_ca_certs(True), [der])
1419
Christian Heimes72d28502013-11-23 13:56:58 +01001420 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001421 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001422 ctx.load_default_certs()
1423
Christian Heimesa170fa12017-09-15 20:27:30 +02001424 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001425 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1426 ctx.load_default_certs()
1427
Christian Heimesa170fa12017-09-15 20:27:30 +02001428 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001429 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1430
Christian Heimesa170fa12017-09-15 20:27:30 +02001431 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001432 self.assertRaises(TypeError, ctx.load_default_certs, None)
1433 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1434
Benjamin Peterson91244e02014-10-03 18:17:15 -04001435 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001436 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001437 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001438 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001439 with support.EnvironmentVarGuard() as env:
1440 env["SSL_CERT_DIR"] = CAPATH
1441 env["SSL_CERT_FILE"] = CERTFILE
1442 ctx.load_default_certs()
1443 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1444
Benjamin Peterson91244e02014-10-03 18:17:15 -04001445 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001446 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001447 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001449 ctx.load_default_certs()
1450 stats = ctx.cert_store_stats()
1451
Christian Heimesa170fa12017-09-15 20:27:30 +02001452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001453 with support.EnvironmentVarGuard() as env:
1454 env["SSL_CERT_DIR"] = CAPATH
1455 env["SSL_CERT_FILE"] = CERTFILE
1456 ctx.load_default_certs()
1457 stats["x509"] += 1
1458 self.assertEqual(ctx.cert_store_stats(), stats)
1459
Christian Heimes358cfd42016-09-10 22:43:48 +02001460 def _assert_context_options(self, ctx):
1461 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1462 if OP_NO_COMPRESSION != 0:
1463 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1464 OP_NO_COMPRESSION)
1465 if OP_SINGLE_DH_USE != 0:
1466 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1467 OP_SINGLE_DH_USE)
1468 if OP_SINGLE_ECDH_USE != 0:
1469 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1470 OP_SINGLE_ECDH_USE)
1471 if OP_CIPHER_SERVER_PREFERENCE != 0:
1472 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1473 OP_CIPHER_SERVER_PREFERENCE)
1474
Christian Heimes4c05b472013-11-23 15:58:30 +01001475 def test_create_default_context(self):
1476 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001477
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001479 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001480 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001481 self._assert_context_options(ctx)
1482
Christian Heimes4c05b472013-11-23 15:58:30 +01001483 with open(SIGNING_CA) as f:
1484 cadata = f.read()
1485 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1486 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001487 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001488 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001489 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001490
1491 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001492 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001493 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001494 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001495
Christian Heimes67986f92013-11-23 22:43:47 +01001496 def test__create_stdlib_context(self):
1497 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001499 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001500 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001501 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001502
1503 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1504 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1505 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001506 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001507
1508 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001509 cert_reqs=ssl.CERT_REQUIRED,
1510 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001511 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1512 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001513 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001515
1516 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001517 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001518 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001519 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001520
Christian Heimes1aa9a752013-12-02 02:41:19 +01001521 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001522 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001523 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001524 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001525
Christian Heimese82c0342017-09-15 20:29:57 +02001526 # Auto set CERT_REQUIRED
1527 ctx.check_hostname = True
1528 self.assertTrue(ctx.check_hostname)
1529 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1530 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001531 ctx.verify_mode = ssl.CERT_REQUIRED
1532 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001533 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001534
Christian Heimese82c0342017-09-15 20:29:57 +02001535 # Changing verify_mode does not affect check_hostname
1536 ctx.check_hostname = False
1537 ctx.verify_mode = ssl.CERT_NONE
1538 ctx.check_hostname = False
1539 self.assertFalse(ctx.check_hostname)
1540 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1541 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001542 ctx.check_hostname = True
1543 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001544 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1545
1546 ctx.check_hostname = False
1547 ctx.verify_mode = ssl.CERT_OPTIONAL
1548 ctx.check_hostname = False
1549 self.assertFalse(ctx.check_hostname)
1550 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1551 # keep CERT_OPTIONAL
1552 ctx.check_hostname = True
1553 self.assertTrue(ctx.check_hostname)
1554 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001555
1556 # Cannot set CERT_NONE with check_hostname enabled
1557 with self.assertRaises(ValueError):
1558 ctx.verify_mode = ssl.CERT_NONE
1559 ctx.check_hostname = False
1560 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001561 ctx.verify_mode = ssl.CERT_NONE
1562 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001563
Christian Heimes5fe668c2016-09-12 00:01:11 +02001564 def test_context_client_server(self):
1565 # PROTOCOL_TLS_CLIENT has sane defaults
1566 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1567 self.assertTrue(ctx.check_hostname)
1568 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1569
1570 # PROTOCOL_TLS_SERVER has different but also sane defaults
1571 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1572 self.assertFalse(ctx.check_hostname)
1573 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1574
Christian Heimes4df60f12017-09-15 20:26:05 +02001575 def test_context_custom_class(self):
1576 class MySSLSocket(ssl.SSLSocket):
1577 pass
1578
1579 class MySSLObject(ssl.SSLObject):
1580 pass
1581
1582 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1583 ctx.sslsocket_class = MySSLSocket
1584 ctx.sslobject_class = MySSLObject
1585
1586 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1587 self.assertIsInstance(sock, MySSLSocket)
1588 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1589 self.assertIsInstance(obj, MySSLObject)
1590
Antoine Pitrou152efa22010-05-16 18:19:27 +00001591
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001592class SSLErrorTests(unittest.TestCase):
1593
1594 def test_str(self):
1595 # The str() of a SSLError doesn't include the errno
1596 e = ssl.SSLError(1, "foo")
1597 self.assertEqual(str(e), "foo")
1598 self.assertEqual(e.errno, 1)
1599 # Same for a subclass
1600 e = ssl.SSLZeroReturnError(1, "foo")
1601 self.assertEqual(str(e), "foo")
1602 self.assertEqual(e.errno, 1)
1603
1604 def test_lib_reason(self):
1605 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001606 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001607 with self.assertRaises(ssl.SSLError) as cm:
1608 ctx.load_dh_params(CERTFILE)
1609 self.assertEqual(cm.exception.library, 'PEM')
1610 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1611 s = str(cm.exception)
1612 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1613
1614 def test_subclass(self):
1615 # Check that the appropriate SSLError subclass is raised
1616 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001617 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1618 ctx.check_hostname = False
1619 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001620 with socket.socket() as s:
1621 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001622 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001623 c = socket.socket()
1624 c.connect(s.getsockname())
1625 c.setblocking(False)
1626 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001627 with self.assertRaises(ssl.SSLWantReadError) as cm:
1628 c.do_handshake()
1629 s = str(cm.exception)
1630 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1631 # For compatibility
1632 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1633
1634
Christian Heimes61d478c2018-01-27 15:51:38 +01001635 def test_bad_server_hostname(self):
1636 ctx = ssl.create_default_context()
1637 with self.assertRaises(ValueError):
1638 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1639 server_hostname="")
1640 with self.assertRaises(ValueError):
1641 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1642 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001643 with self.assertRaises(TypeError):
1644 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1645 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001646
1647
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001648class MemoryBIOTests(unittest.TestCase):
1649
1650 def test_read_write(self):
1651 bio = ssl.MemoryBIO()
1652 bio.write(b'foo')
1653 self.assertEqual(bio.read(), b'foo')
1654 self.assertEqual(bio.read(), b'')
1655 bio.write(b'foo')
1656 bio.write(b'bar')
1657 self.assertEqual(bio.read(), b'foobar')
1658 self.assertEqual(bio.read(), b'')
1659 bio.write(b'baz')
1660 self.assertEqual(bio.read(2), b'ba')
1661 self.assertEqual(bio.read(1), b'z')
1662 self.assertEqual(bio.read(1), b'')
1663
1664 def test_eof(self):
1665 bio = ssl.MemoryBIO()
1666 self.assertFalse(bio.eof)
1667 self.assertEqual(bio.read(), b'')
1668 self.assertFalse(bio.eof)
1669 bio.write(b'foo')
1670 self.assertFalse(bio.eof)
1671 bio.write_eof()
1672 self.assertFalse(bio.eof)
1673 self.assertEqual(bio.read(2), b'fo')
1674 self.assertFalse(bio.eof)
1675 self.assertEqual(bio.read(1), b'o')
1676 self.assertTrue(bio.eof)
1677 self.assertEqual(bio.read(), b'')
1678 self.assertTrue(bio.eof)
1679
1680 def test_pending(self):
1681 bio = ssl.MemoryBIO()
1682 self.assertEqual(bio.pending, 0)
1683 bio.write(b'foo')
1684 self.assertEqual(bio.pending, 3)
1685 for i in range(3):
1686 bio.read(1)
1687 self.assertEqual(bio.pending, 3-i-1)
1688 for i in range(3):
1689 bio.write(b'x')
1690 self.assertEqual(bio.pending, i+1)
1691 bio.read()
1692 self.assertEqual(bio.pending, 0)
1693
1694 def test_buffer_types(self):
1695 bio = ssl.MemoryBIO()
1696 bio.write(b'foo')
1697 self.assertEqual(bio.read(), b'foo')
1698 bio.write(bytearray(b'bar'))
1699 self.assertEqual(bio.read(), b'bar')
1700 bio.write(memoryview(b'baz'))
1701 self.assertEqual(bio.read(), b'baz')
1702
1703 def test_error_types(self):
1704 bio = ssl.MemoryBIO()
1705 self.assertRaises(TypeError, bio.write, 'foo')
1706 self.assertRaises(TypeError, bio.write, None)
1707 self.assertRaises(TypeError, bio.write, True)
1708 self.assertRaises(TypeError, bio.write, 1)
1709
1710
Christian Heimes9d50ab52018-02-27 10:17:30 +01001711class SSLObjectTests(unittest.TestCase):
1712 def test_private_init(self):
1713 bio = ssl.MemoryBIO()
1714 with self.assertRaisesRegex(TypeError, "public constructor"):
1715 ssl.SSLObject(bio, bio)
1716
1717
Martin Panter3840b2a2016-03-27 01:53:46 +00001718class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001719 """Tests that connect to a simple server running in the background"""
1720
1721 def setUp(self):
1722 server = ThreadedEchoServer(SIGNED_CERTFILE)
1723 self.server_addr = (HOST, server.port)
1724 server.__enter__()
1725 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001726
Antoine Pitrou480a1242010-04-28 21:37:09 +00001727 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001728 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001729 cert_reqs=ssl.CERT_NONE) as s:
1730 s.connect(self.server_addr)
1731 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001732 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001733
Martin Panter3840b2a2016-03-27 01:53:46 +00001734 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001735 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001736 cert_reqs=ssl.CERT_REQUIRED,
1737 ca_certs=SIGNING_CA) as s:
1738 s.connect(self.server_addr)
1739 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001740 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001741
Martin Panter3840b2a2016-03-27 01:53:46 +00001742 def test_connect_fail(self):
1743 # This should fail because we have no verification certs. Connection
1744 # failure crashes ThreadedEchoServer, so run this in an independent
1745 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001746 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001747 cert_reqs=ssl.CERT_REQUIRED)
1748 self.addCleanup(s.close)
1749 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1750 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001751
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001752 def test_connect_ex(self):
1753 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001754 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001755 cert_reqs=ssl.CERT_REQUIRED,
1756 ca_certs=SIGNING_CA)
1757 self.addCleanup(s.close)
1758 self.assertEqual(0, s.connect_ex(self.server_addr))
1759 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001760
1761 def test_non_blocking_connect_ex(self):
1762 # Issue #11326: non-blocking connect_ex() should allow handshake
1763 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001764 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001765 cert_reqs=ssl.CERT_REQUIRED,
1766 ca_certs=SIGNING_CA,
1767 do_handshake_on_connect=False)
1768 self.addCleanup(s.close)
1769 s.setblocking(False)
1770 rc = s.connect_ex(self.server_addr)
1771 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1772 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1773 # Wait for connect to finish
1774 select.select([], [s], [], 5.0)
1775 # Non-blocking handshake
1776 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001777 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001778 s.do_handshake()
1779 break
1780 except ssl.SSLWantReadError:
1781 select.select([s], [], [], 5.0)
1782 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001783 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001784 # SSL established
1785 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001786
Antoine Pitrou152efa22010-05-16 18:19:27 +00001787 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001788 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001789 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001790 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1791 s.connect(self.server_addr)
1792 self.assertEqual({}, s.getpeercert())
1793 # Same with a server hostname
1794 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1795 server_hostname="dummy") as s:
1796 s.connect(self.server_addr)
1797 ctx.verify_mode = ssl.CERT_REQUIRED
1798 # This should succeed because we specify the root cert
1799 ctx.load_verify_locations(SIGNING_CA)
1800 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1801 s.connect(self.server_addr)
1802 cert = s.getpeercert()
1803 self.assertTrue(cert)
1804
1805 def test_connect_with_context_fail(self):
1806 # This should fail because we have no verification certs. Connection
1807 # failure crashes ThreadedEchoServer, so run this in an independent
1808 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001809 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001810 ctx.verify_mode = ssl.CERT_REQUIRED
1811 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1812 self.addCleanup(s.close)
1813 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1814 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001815
1816 def test_connect_capath(self):
1817 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001818 # NOTE: the subject hashing algorithm has been changed between
1819 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1820 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001821 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001822 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001823 ctx.verify_mode = ssl.CERT_REQUIRED
1824 ctx.load_verify_locations(capath=CAPATH)
1825 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1826 s.connect(self.server_addr)
1827 cert = s.getpeercert()
1828 self.assertTrue(cert)
1829 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001830 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001831 ctx.verify_mode = ssl.CERT_REQUIRED
1832 ctx.load_verify_locations(capath=BYTES_CAPATH)
1833 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1834 s.connect(self.server_addr)
1835 cert = s.getpeercert()
1836 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001837
Christian Heimesefff7062013-11-21 03:35:02 +01001838 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001839 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001840 pem = f.read()
1841 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001842 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 ctx.verify_mode = ssl.CERT_REQUIRED
Christian Heimes05d9fe32018-02-27 08:55:39 +01001844 # TODO: fix TLSv1.3 support
1845 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001846 ctx.load_verify_locations(cadata=pem)
1847 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1848 s.connect(self.server_addr)
1849 cert = s.getpeercert()
1850 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001851
Martin Panter3840b2a2016-03-27 01:53:46 +00001852 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001853 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001854 ctx.verify_mode = ssl.CERT_REQUIRED
Christian Heimes05d9fe32018-02-27 08:55:39 +01001855 # TODO: fix TLSv1.3 support
1856 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001857 ctx.load_verify_locations(cadata=der)
1858 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1859 s.connect(self.server_addr)
1860 cert = s.getpeercert()
1861 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001862
Antoine Pitroue3220242010-04-24 11:13:53 +00001863 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1864 def test_makefile_close(self):
1865 # Issue #5238: creating a file-like object with makefile() shouldn't
1866 # delay closing the underlying "real socket" (here tested with its
1867 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001868 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001869 ss.connect(self.server_addr)
1870 fd = ss.fileno()
1871 f = ss.makefile()
1872 f.close()
1873 # The fd is still open
1874 os.read(fd, 0)
1875 # Closing the SSL socket should close the fd too
1876 ss.close()
1877 gc.collect()
1878 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001879 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001880 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001881
Antoine Pitrou480a1242010-04-28 21:37:09 +00001882 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001883 s = socket.socket(socket.AF_INET)
1884 s.connect(self.server_addr)
1885 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001886 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001887 cert_reqs=ssl.CERT_NONE,
1888 do_handshake_on_connect=False)
1889 self.addCleanup(s.close)
1890 count = 0
1891 while True:
1892 try:
1893 count += 1
1894 s.do_handshake()
1895 break
1896 except ssl.SSLWantReadError:
1897 select.select([s], [], [])
1898 except ssl.SSLWantWriteError:
1899 select.select([], [s], [])
1900 if support.verbose:
1901 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001902
Antoine Pitrou480a1242010-04-28 21:37:09 +00001903 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001904 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001905
Martin Panter3840b2a2016-03-27 01:53:46 +00001906 def test_get_server_certificate_fail(self):
1907 # Connection failure crashes ThreadedEchoServer, so run this in an
1908 # independent test method
1909 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001910
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001911 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001912 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001913 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1914 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001915 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001916 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1917 s.connect(self.server_addr)
1918 # Error checking can happen at instantiation or when connecting
1919 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1920 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001921 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001922 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1923 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001924
Christian Heimes9a5395a2013-06-17 15:44:12 +02001925 def test_get_ca_certs_capath(self):
1926 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001927 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001928 ctx.load_verify_locations(capath=CAPATH)
1929 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001930 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1931 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 s.connect(self.server_addr)
1933 cert = s.getpeercert()
1934 self.assertTrue(cert)
1935 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001936
Christian Heimes575596e2013-12-15 21:49:17 +01001937 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001938 def test_context_setget(self):
1939 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001940 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1941 ctx1.load_verify_locations(capath=CAPATH)
1942 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1943 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001944 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001945 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 ss.connect(self.server_addr)
1947 self.assertIs(ss.context, ctx1)
1948 self.assertIs(ss._sslobj.context, ctx1)
1949 ss.context = ctx2
1950 self.assertIs(ss.context, ctx2)
1951 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001952
1953 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1954 # A simple IO loop. Call func(*args) depending on the error we get
1955 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1956 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001957 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001958 count = 0
1959 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001960 if time.monotonic() > deadline:
1961 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001962 errno = None
1963 count += 1
1964 try:
1965 ret = func(*args)
1966 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001967 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001968 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001969 raise
1970 errno = e.errno
1971 # Get any data from the outgoing BIO irrespective of any error, and
1972 # send it to the socket.
1973 buf = outgoing.read()
1974 sock.sendall(buf)
1975 # If there's no error, we're done. For WANT_READ, we need to get
1976 # data from the socket and put it in the incoming BIO.
1977 if errno is None:
1978 break
1979 elif errno == ssl.SSL_ERROR_WANT_READ:
1980 buf = sock.recv(32768)
1981 if buf:
1982 incoming.write(buf)
1983 else:
1984 incoming.write_eof()
1985 if support.verbose:
1986 sys.stdout.write("Needed %d calls to complete %s().\n"
1987 % (count, func.__name__))
1988 return ret
1989
Martin Panter3840b2a2016-03-27 01:53:46 +00001990 def test_bio_handshake(self):
1991 sock = socket.socket(socket.AF_INET)
1992 self.addCleanup(sock.close)
1993 sock.connect(self.server_addr)
1994 incoming = ssl.MemoryBIO()
1995 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001996 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1997 self.assertTrue(ctx.check_hostname)
1998 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001999 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002000 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2001 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002002 self.assertIs(sslobj._sslobj.owner, sslobj)
2003 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002004 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002005 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002006 self.assertRaises(ValueError, sslobj.getpeercert)
2007 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2008 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2009 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2010 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002011 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002012 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002013 self.assertTrue(sslobj.getpeercert())
2014 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2015 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2016 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002017 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002018 except ssl.SSLSyscallError:
2019 # If the server shuts down the TCP connection without sending a
2020 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2021 pass
2022 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2023
2024 def test_bio_read_write_data(self):
2025 sock = socket.socket(socket.AF_INET)
2026 self.addCleanup(sock.close)
2027 sock.connect(self.server_addr)
2028 incoming = ssl.MemoryBIO()
2029 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002030 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002031 ctx.verify_mode = ssl.CERT_NONE
2032 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2033 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2034 req = b'FOO\n'
2035 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2036 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2037 self.assertEqual(buf, b'foo\n')
2038 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002039
2040
Martin Panter3840b2a2016-03-27 01:53:46 +00002041class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002042
Martin Panter3840b2a2016-03-27 01:53:46 +00002043 def test_timeout_connect_ex(self):
2044 # Issue #12065: on a timeout, connect_ex() should return the original
2045 # errno (mimicking the behaviour of non-SSL sockets).
2046 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002047 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002048 cert_reqs=ssl.CERT_REQUIRED,
2049 do_handshake_on_connect=False)
2050 self.addCleanup(s.close)
2051 s.settimeout(0.0000001)
2052 rc = s.connect_ex((REMOTE_HOST, 443))
2053 if rc == 0:
2054 self.skipTest("REMOTE_HOST responded too quickly")
2055 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2056
2057 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2058 def test_get_server_certificate_ipv6(self):
2059 with support.transient_internet('ipv6.google.com'):
2060 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2061 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2062
Martin Panter3840b2a2016-03-27 01:53:46 +00002063
2064def _test_get_server_certificate(test, host, port, cert=None):
2065 pem = ssl.get_server_certificate((host, port))
2066 if not pem:
2067 test.fail("No server certificate on %s:%s!" % (host, port))
2068
2069 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2070 if not pem:
2071 test.fail("No server certificate on %s:%s!" % (host, port))
2072 if support.verbose:
2073 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2074
2075def _test_get_server_certificate_fail(test, host, port):
2076 try:
2077 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2078 except ssl.SSLError as x:
2079 #should fail
2080 if support.verbose:
2081 sys.stdout.write("%s\n" % x)
2082 else:
2083 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2084
2085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002086from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002087
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002088class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002089
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002090 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002091
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002092 """A mildly complicated class, because we want it to work both
2093 with and without the SSL wrapper around the socket connection, so
2094 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002096 def __init__(self, server, connsock, addr):
2097 self.server = server
2098 self.running = False
2099 self.sock = connsock
2100 self.addr = addr
2101 self.sock.setblocking(1)
2102 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002103 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002104 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002106 def wrap_conn(self):
2107 try:
2108 self.sslconn = self.server.context.wrap_socket(
2109 self.sock, server_side=True)
2110 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2111 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2112 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2113 # We treat ConnectionResetError as though it were an
2114 # SSLError - OpenSSL on Ubuntu abruptly closes the
2115 # connection when asked to use an unsupported protocol.
2116 #
2117 # OSError may occur with wrong protocols, e.g. both
2118 # sides use PROTOCOL_TLS_SERVER.
2119 #
2120 # XXX Various errors can have happened here, for example
2121 # a mismatching protocol version, an invalid certificate,
2122 # or a low-level bug. This should be made more discriminating.
2123 #
2124 # bpo-31323: Store the exception as string to prevent
2125 # a reference leak: server -> conn_errors -> exception
2126 # -> traceback -> self (ConnectionHandler) -> server
2127 self.server.conn_errors.append(str(e))
2128 if self.server.chatty:
2129 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2130 self.running = False
2131 self.server.stop()
2132 self.close()
2133 return False
2134 else:
2135 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2136 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2137 cert = self.sslconn.getpeercert()
2138 if support.verbose and self.server.chatty:
2139 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2140 cert_binary = self.sslconn.getpeercert(True)
2141 if support.verbose and self.server.chatty:
2142 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2143 cipher = self.sslconn.cipher()
2144 if support.verbose and self.server.chatty:
2145 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2146 sys.stdout.write(" server: selected protocol is now "
2147 + str(self.sslconn.selected_npn_protocol()) + "\n")
2148 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002149
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002150 def read(self):
2151 if self.sslconn:
2152 return self.sslconn.read()
2153 else:
2154 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002155
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002156 def write(self, bytes):
2157 if self.sslconn:
2158 return self.sslconn.write(bytes)
2159 else:
2160 return self.sock.send(bytes)
2161
2162 def close(self):
2163 if self.sslconn:
2164 self.sslconn.close()
2165 else:
2166 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002167
Antoine Pitrou480a1242010-04-28 21:37:09 +00002168 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002169 self.running = True
2170 if not self.server.starttls_server:
2171 if not self.wrap_conn():
2172 return
2173 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002174 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002175 msg = self.read()
2176 stripped = msg.strip()
2177 if not stripped:
2178 # eof, so quit this handler
2179 self.running = False
2180 try:
2181 self.sock = self.sslconn.unwrap()
2182 except OSError:
2183 # Many tests shut the TCP connection down
2184 # without an SSL shutdown. This causes
2185 # unwrap() to raise OSError with errno=0!
2186 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002187 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002188 self.sslconn = None
2189 self.close()
2190 elif stripped == b'over':
2191 if support.verbose and self.server.connectionchatty:
2192 sys.stdout.write(" server: client closed connection\n")
2193 self.close()
2194 return
2195 elif (self.server.starttls_server and
2196 stripped == b'STARTTLS'):
2197 if support.verbose and self.server.connectionchatty:
2198 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2199 self.write(b"OK\n")
2200 if not self.wrap_conn():
2201 return
2202 elif (self.server.starttls_server and self.sslconn
2203 and stripped == b'ENDTLS'):
2204 if support.verbose and self.server.connectionchatty:
2205 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2206 self.write(b"OK\n")
2207 self.sock = self.sslconn.unwrap()
2208 self.sslconn = None
2209 if support.verbose and self.server.connectionchatty:
2210 sys.stdout.write(" server: connection is now unencrypted...\n")
2211 elif stripped == b'CB tls-unique':
2212 if support.verbose and self.server.connectionchatty:
2213 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2214 data = self.sslconn.get_channel_binding("tls-unique")
2215 self.write(repr(data).encode("us-ascii") + b"\n")
2216 else:
2217 if (support.verbose and
2218 self.server.connectionchatty):
2219 ctype = (self.sslconn and "encrypted") or "unencrypted"
2220 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2221 % (msg, ctype, msg.lower(), ctype))
2222 self.write(msg.lower())
2223 except OSError:
2224 if self.server.chatty:
2225 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002226 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002227 self.running = False
2228 # normally, we'd just stop here, but for the test
2229 # harness, we want to stop the server
2230 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002231
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002232 def __init__(self, certificate=None, ssl_version=None,
2233 certreqs=None, cacerts=None,
2234 chatty=True, connectionchatty=False, starttls_server=False,
2235 npn_protocols=None, alpn_protocols=None,
2236 ciphers=None, context=None):
2237 if context:
2238 self.context = context
2239 else:
2240 self.context = ssl.SSLContext(ssl_version
2241 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002242 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002243 self.context.verify_mode = (certreqs if certreqs is not None
2244 else ssl.CERT_NONE)
2245 if cacerts:
2246 self.context.load_verify_locations(cacerts)
2247 if certificate:
2248 self.context.load_cert_chain(certificate)
2249 if npn_protocols:
2250 self.context.set_npn_protocols(npn_protocols)
2251 if alpn_protocols:
2252 self.context.set_alpn_protocols(alpn_protocols)
2253 if ciphers:
2254 self.context.set_ciphers(ciphers)
2255 self.chatty = chatty
2256 self.connectionchatty = connectionchatty
2257 self.starttls_server = starttls_server
2258 self.sock = socket.socket()
2259 self.port = support.bind_port(self.sock)
2260 self.flag = None
2261 self.active = False
2262 self.selected_npn_protocols = []
2263 self.selected_alpn_protocols = []
2264 self.shared_ciphers = []
2265 self.conn_errors = []
2266 threading.Thread.__init__(self)
2267 self.daemon = True
2268
2269 def __enter__(self):
2270 self.start(threading.Event())
2271 self.flag.wait()
2272 return self
2273
2274 def __exit__(self, *args):
2275 self.stop()
2276 self.join()
2277
2278 def start(self, flag=None):
2279 self.flag = flag
2280 threading.Thread.start(self)
2281
2282 def run(self):
2283 self.sock.settimeout(0.05)
2284 self.sock.listen()
2285 self.active = True
2286 if self.flag:
2287 # signal an event
2288 self.flag.set()
2289 while self.active:
2290 try:
2291 newconn, connaddr = self.sock.accept()
2292 if support.verbose and self.chatty:
2293 sys.stdout.write(' server: new connection from '
2294 + repr(connaddr) + '\n')
2295 handler = self.ConnectionHandler(self, newconn, connaddr)
2296 handler.start()
2297 handler.join()
2298 except socket.timeout:
2299 pass
2300 except KeyboardInterrupt:
2301 self.stop()
2302 self.sock.close()
2303
2304 def stop(self):
2305 self.active = False
2306
2307class AsyncoreEchoServer(threading.Thread):
2308
2309 # this one's based on asyncore.dispatcher
2310
2311 class EchoServer (asyncore.dispatcher):
2312
2313 class ConnectionHandler(asyncore.dispatcher_with_send):
2314
2315 def __init__(self, conn, certfile):
2316 self.socket = test_wrap_socket(conn, server_side=True,
2317 certfile=certfile,
2318 do_handshake_on_connect=False)
2319 asyncore.dispatcher_with_send.__init__(self, self.socket)
2320 self._ssl_accepting = True
2321 self._do_ssl_handshake()
2322
2323 def readable(self):
2324 if isinstance(self.socket, ssl.SSLSocket):
2325 while self.socket.pending() > 0:
2326 self.handle_read_event()
2327 return True
2328
2329 def _do_ssl_handshake(self):
2330 try:
2331 self.socket.do_handshake()
2332 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2333 return
2334 except ssl.SSLEOFError:
2335 return self.handle_close()
2336 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002337 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002338 except OSError as err:
2339 if err.args[0] == errno.ECONNABORTED:
2340 return self.handle_close()
2341 else:
2342 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002343
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002344 def handle_read(self):
2345 if self._ssl_accepting:
2346 self._do_ssl_handshake()
2347 else:
2348 data = self.recv(1024)
2349 if support.verbose:
2350 sys.stdout.write(" server: read %s from client\n" % repr(data))
2351 if not data:
2352 self.close()
2353 else:
2354 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002355
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002356 def handle_close(self):
2357 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002358 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002359 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002360
2361 def handle_error(self):
2362 raise
2363
Trent Nelson78520002008-04-10 20:54:35 +00002364 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002365 self.certfile = certfile
2366 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2367 self.port = support.bind_port(sock, '')
2368 asyncore.dispatcher.__init__(self, sock)
2369 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002370
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002371 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002372 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002373 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2374 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002375
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002376 def handle_error(self):
2377 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002378
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002379 def __init__(self, certfile):
2380 self.flag = None
2381 self.active = False
2382 self.server = self.EchoServer(certfile)
2383 self.port = self.server.port
2384 threading.Thread.__init__(self)
2385 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002386
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002387 def __str__(self):
2388 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002389
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002390 def __enter__(self):
2391 self.start(threading.Event())
2392 self.flag.wait()
2393 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002394
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002395 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002396 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002397 sys.stdout.write(" cleanup: stopping server.\n")
2398 self.stop()
2399 if support.verbose:
2400 sys.stdout.write(" cleanup: joining server thread.\n")
2401 self.join()
2402 if support.verbose:
2403 sys.stdout.write(" cleanup: successfully joined.\n")
2404 # make sure that ConnectionHandler is removed from socket_map
2405 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002406
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002407 def start (self, flag=None):
2408 self.flag = flag
2409 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002410
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002411 def run(self):
2412 self.active = True
2413 if self.flag:
2414 self.flag.set()
2415 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002416 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002417 asyncore.loop(1)
2418 except:
2419 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002420
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002421 def stop(self):
2422 self.active = False
2423 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002424
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002425def server_params_test(client_context, server_context, indata=b"FOO\n",
2426 chatty=True, connectionchatty=False, sni_name=None,
2427 session=None):
2428 """
2429 Launch a server, connect a client to it and try various reads
2430 and writes.
2431 """
2432 stats = {}
2433 server = ThreadedEchoServer(context=server_context,
2434 chatty=chatty,
2435 connectionchatty=False)
2436 with server:
2437 with client_context.wrap_socket(socket.socket(),
2438 server_hostname=sni_name, session=session) as s:
2439 s.connect((HOST, server.port))
2440 for arg in [indata, bytearray(indata), memoryview(indata)]:
2441 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002442 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002443 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002444 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002445 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002446 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002447 if connectionchatty:
2448 if support.verbose:
2449 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002450 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002451 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002452 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2453 % (outdata[:20], len(outdata),
2454 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002455 s.write(b"over\n")
2456 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002457 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002458 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002459 stats.update({
2460 'compression': s.compression(),
2461 'cipher': s.cipher(),
2462 'peercert': s.getpeercert(),
2463 'client_alpn_protocol': s.selected_alpn_protocol(),
2464 'client_npn_protocol': s.selected_npn_protocol(),
2465 'version': s.version(),
2466 'session_reused': s.session_reused,
2467 'session': s.session,
2468 })
2469 s.close()
2470 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2471 stats['server_npn_protocols'] = server.selected_npn_protocols
2472 stats['server_shared_ciphers'] = server.shared_ciphers
2473 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002474
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475def try_protocol_combo(server_protocol, client_protocol, expect_success,
2476 certsreqs=None, server_options=0, client_options=0):
2477 """
2478 Try to SSL-connect using *client_protocol* to *server_protocol*.
2479 If *expect_success* is true, assert that the connection succeeds,
2480 if it's false, assert that the connection fails.
2481 Also, if *expect_success* is a string, assert that it is the protocol
2482 version actually used by the connection.
2483 """
2484 if certsreqs is None:
2485 certsreqs = ssl.CERT_NONE
2486 certtype = {
2487 ssl.CERT_NONE: "CERT_NONE",
2488 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2489 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2490 }[certsreqs]
2491 if support.verbose:
2492 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2493 sys.stdout.write(formatstr %
2494 (ssl.get_protocol_name(client_protocol),
2495 ssl.get_protocol_name(server_protocol),
2496 certtype))
2497 client_context = ssl.SSLContext(client_protocol)
2498 client_context.options |= client_options
2499 server_context = ssl.SSLContext(server_protocol)
2500 server_context.options |= server_options
2501
2502 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2503 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2504 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002505 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002506 client_context.set_ciphers("ALL")
2507
2508 for ctx in (client_context, server_context):
2509 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002510 ctx.load_cert_chain(SIGNED_CERTFILE)
2511 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002512 try:
2513 stats = server_params_test(client_context, server_context,
2514 chatty=False, connectionchatty=False)
2515 # Protocol mismatch can result in either an SSLError, or a
2516 # "Connection reset by peer" error.
2517 except ssl.SSLError:
2518 if expect_success:
2519 raise
2520 except OSError as e:
2521 if expect_success or e.errno != errno.ECONNRESET:
2522 raise
2523 else:
2524 if not expect_success:
2525 raise AssertionError(
2526 "Client protocol %s succeeded with server protocol %s!"
2527 % (ssl.get_protocol_name(client_protocol),
2528 ssl.get_protocol_name(server_protocol)))
2529 elif (expect_success is not True
2530 and expect_success != stats['version']):
2531 raise AssertionError("version mismatch: expected %r, got %r"
2532 % (expect_success, stats['version']))
2533
2534
2535class ThreadedTests(unittest.TestCase):
2536
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002537 def test_echo(self):
2538 """Basic test of an SSL client connecting to a server"""
2539 if support.verbose:
2540 sys.stdout.write("\n")
2541 for protocol in PROTOCOLS:
2542 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2543 continue
2544 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2545 context = ssl.SSLContext(protocol)
2546 context.load_cert_chain(CERTFILE)
2547 server_params_test(context, context,
2548 chatty=True, connectionchatty=True)
2549
Christian Heimesa170fa12017-09-15 20:27:30 +02002550 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002551
2552 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2553 server_params_test(client_context=client_context,
2554 server_context=server_context,
2555 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002556 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557
2558 client_context.check_hostname = False
2559 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2560 with self.assertRaises(ssl.SSLError) as e:
2561 server_params_test(client_context=server_context,
2562 server_context=client_context,
2563 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002564 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002565 self.assertIn('called a function you should not call',
2566 str(e.exception))
2567
2568 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2569 with self.assertRaises(ssl.SSLError) as e:
2570 server_params_test(client_context=server_context,
2571 server_context=server_context,
2572 chatty=True, connectionchatty=True)
2573 self.assertIn('called a function you should not call',
2574 str(e.exception))
2575
2576 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2577 with self.assertRaises(ssl.SSLError) as e:
2578 server_params_test(client_context=server_context,
2579 server_context=client_context,
2580 chatty=True, connectionchatty=True)
2581 self.assertIn('called a function you should not call',
2582 str(e.exception))
2583
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002584 def test_getpeercert(self):
2585 if support.verbose:
2586 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002587
2588 client_context, server_context, hostname = testing_context()
2589 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002591 with client_context.wrap_socket(socket.socket(),
2592 do_handshake_on_connect=False,
2593 server_hostname=hostname) as s:
2594 s.connect((HOST, server.port))
2595 # getpeercert() raise ValueError while the handshake isn't
2596 # done.
2597 with self.assertRaises(ValueError):
2598 s.getpeercert()
2599 s.do_handshake()
2600 cert = s.getpeercert()
2601 self.assertTrue(cert, "Can't get peer certificate.")
2602 cipher = s.cipher()
2603 if support.verbose:
2604 sys.stdout.write(pprint.pformat(cert) + '\n')
2605 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2606 if 'subject' not in cert:
2607 self.fail("No subject field in certificate: %s." %
2608 pprint.pformat(cert))
2609 if ((('organizationName', 'Python Software Foundation'),)
2610 not in cert['subject']):
2611 self.fail(
2612 "Missing or invalid 'organizationName' field in certificate subject; "
2613 "should be 'Python Software Foundation'.")
2614 self.assertIn('notBefore', cert)
2615 self.assertIn('notAfter', cert)
2616 before = ssl.cert_time_to_seconds(cert['notBefore'])
2617 after = ssl.cert_time_to_seconds(cert['notAfter'])
2618 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002619
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002620 @unittest.skipUnless(have_verify_flags(),
2621 "verify_flags need OpenSSL > 0.9.8")
2622 def test_crl_check(self):
2623 if support.verbose:
2624 sys.stdout.write("\n")
2625
Christian Heimesa170fa12017-09-15 20:27:30 +02002626 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002627
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002628 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002629 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002630
2631 # VERIFY_DEFAULT should pass
2632 server = ThreadedEchoServer(context=server_context, chatty=True)
2633 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002634 with client_context.wrap_socket(socket.socket(),
2635 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002636 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002637 cert = s.getpeercert()
2638 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002639
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002641 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002642
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002643 server = ThreadedEchoServer(context=server_context, chatty=True)
2644 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002645 with client_context.wrap_socket(socket.socket(),
2646 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002647 with self.assertRaisesRegex(ssl.SSLError,
2648 "certificate verify failed"):
2649 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002650
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002651 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002652 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002653
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002654 server = ThreadedEchoServer(context=server_context, chatty=True)
2655 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002656 with client_context.wrap_socket(socket.socket(),
2657 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002658 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002659 cert = s.getpeercert()
2660 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002661
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002662 def test_check_hostname(self):
2663 if support.verbose:
2664 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002665
Christian Heimesa170fa12017-09-15 20:27:30 +02002666 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002667
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002668 # correct hostname should verify
2669 server = ThreadedEchoServer(context=server_context, chatty=True)
2670 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002671 with client_context.wrap_socket(socket.socket(),
2672 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002673 s.connect((HOST, server.port))
2674 cert = s.getpeercert()
2675 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002676
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002677 # incorrect hostname should raise an exception
2678 server = ThreadedEchoServer(context=server_context, chatty=True)
2679 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002680 with client_context.wrap_socket(socket.socket(),
2681 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002682 with self.assertRaisesRegex(
2683 ssl.CertificateError,
2684 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002685 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002686
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687 # missing server_hostname arg should cause an exception, too
2688 server = ThreadedEchoServer(context=server_context, chatty=True)
2689 with server:
2690 with socket.socket() as s:
2691 with self.assertRaisesRegex(ValueError,
2692 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002693 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002694
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002695 def test_ecc_cert(self):
2696 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2697 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002698 client_context.set_ciphers(
2699 'TLS13-AES-128-GCM-SHA256:TLS13-CHACHA20-POLY1305-SHA256:'
2700 'ECDHE:ECDSA:!NULL:!aRSA'
2701 )
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002702 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2703
2704 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2705 # load ECC cert
2706 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2707
2708 # correct hostname should verify
2709 server = ThreadedEchoServer(context=server_context, chatty=True)
2710 with server:
2711 with client_context.wrap_socket(socket.socket(),
2712 server_hostname=hostname) as s:
2713 s.connect((HOST, server.port))
2714 cert = s.getpeercert()
2715 self.assertTrue(cert, "Can't get peer certificate.")
2716 cipher = s.cipher()[0].split('-')
2717 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2718
2719 def test_dual_rsa_ecc(self):
2720 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2721 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002722 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2723 # algorithms.
2724 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002725 # only ECDSA certs
2726 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2727 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2728
2729 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2730 # load ECC and RSA key/cert pairs
2731 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2732 server_context.load_cert_chain(SIGNED_CERTFILE)
2733
2734 # correct hostname should verify
2735 server = ThreadedEchoServer(context=server_context, chatty=True)
2736 with server:
2737 with client_context.wrap_socket(socket.socket(),
2738 server_hostname=hostname) as s:
2739 s.connect((HOST, server.port))
2740 cert = s.getpeercert()
2741 self.assertTrue(cert, "Can't get peer certificate.")
2742 cipher = s.cipher()[0].split('-')
2743 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2744
Christian Heimes66e57422018-01-29 14:25:13 +01002745 def test_check_hostname_idn(self):
2746 if support.verbose:
2747 sys.stdout.write("\n")
2748
Christian Heimes11a14932018-02-24 02:35:08 +01002749 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002750 server_context.load_cert_chain(IDNSANSFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002751 # TODO: fix TLSv1.3 support
2752 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimes66e57422018-01-29 14:25:13 +01002753
Christian Heimes11a14932018-02-24 02:35:08 +01002754 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002755 context.verify_mode = ssl.CERT_REQUIRED
2756 context.check_hostname = True
2757 context.load_verify_locations(SIGNING_CA)
2758
2759 # correct hostname should verify, when specified in several
2760 # different ways
2761 idn_hostnames = [
2762 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002763 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002764 ('xn--knig-5qa.idn.pythontest.net',
2765 'xn--knig-5qa.idn.pythontest.net'),
2766 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002767 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002768
2769 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002770 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002771 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2772 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2773 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002774 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2775
2776 # ('königsgäßchen.idna2008.pythontest.net',
2777 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2778 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2779 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2780 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2781 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2782
Christian Heimes66e57422018-01-29 14:25:13 +01002783 ]
2784 for server_hostname, expected_hostname in idn_hostnames:
2785 server = ThreadedEchoServer(context=server_context, chatty=True)
2786 with server:
2787 with context.wrap_socket(socket.socket(),
2788 server_hostname=server_hostname) as s:
2789 self.assertEqual(s.server_hostname, expected_hostname)
2790 s.connect((HOST, server.port))
2791 cert = s.getpeercert()
2792 self.assertEqual(s.server_hostname, expected_hostname)
2793 self.assertTrue(cert, "Can't get peer certificate.")
2794
Christian Heimes66e57422018-01-29 14:25:13 +01002795 # incorrect hostname should raise an exception
2796 server = ThreadedEchoServer(context=server_context, chatty=True)
2797 with server:
2798 with context.wrap_socket(socket.socket(),
2799 server_hostname="python.example.org") as s:
2800 with self.assertRaises(ssl.CertificateError):
2801 s.connect((HOST, server.port))
2802
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002803 def test_wrong_cert(self):
2804 """Connecting when the server rejects the client's certificate
2805
2806 Launch a server with CERT_REQUIRED, and check that trying to
2807 connect to it with a wrong client certificate fails.
2808 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002809 client_context, server_context, hostname = testing_context()
2810 # load client cert
2811 client_context.load_cert_chain(WRONG_CERT)
2812 # require TLS client authentication
2813 server_context.verify_mode = ssl.CERT_REQUIRED
2814 # TODO: fix TLSv1.3 support
2815 # With TLS 1.3, test fails with exception in server thread
2816 server_context.options |= ssl.OP_NO_TLSv1_3
2817
2818 server = ThreadedEchoServer(
2819 context=server_context, chatty=True, connectionchatty=True,
2820 )
2821
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002822 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002823 client_context.wrap_socket(socket.socket(),
2824 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002825 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002826 # Expect either an SSL error about the server rejecting
2827 # the connection, or a low-level connection reset (which
2828 # sometimes happens on Windows)
2829 s.connect((HOST, server.port))
2830 except ssl.SSLError as e:
2831 if support.verbose:
2832 sys.stdout.write("\nSSLError is %r\n" % e)
2833 except OSError as e:
2834 if e.errno != errno.ECONNRESET:
2835 raise
2836 if support.verbose:
2837 sys.stdout.write("\nsocket.error is %r\n" % e)
2838 else:
2839 self.fail("Use of invalid cert should have failed!")
2840
2841 def test_rude_shutdown(self):
2842 """A brutal shutdown of an SSL server should raise an OSError
2843 in the client when attempting handshake.
2844 """
2845 listener_ready = threading.Event()
2846 listener_gone = threading.Event()
2847
2848 s = socket.socket()
2849 port = support.bind_port(s, HOST)
2850
2851 # `listener` runs in a thread. It sits in an accept() until
2852 # the main thread connects. Then it rudely closes the socket,
2853 # and sets Event `listener_gone` to let the main thread know
2854 # the socket is gone.
2855 def listener():
2856 s.listen()
2857 listener_ready.set()
2858 newsock, addr = s.accept()
2859 newsock.close()
2860 s.close()
2861 listener_gone.set()
2862
2863 def connector():
2864 listener_ready.wait()
2865 with socket.socket() as c:
2866 c.connect((HOST, port))
2867 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002868 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002869 ssl_sock = test_wrap_socket(c)
2870 except OSError:
2871 pass
2872 else:
2873 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002874
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002875 t = threading.Thread(target=listener)
2876 t.start()
2877 try:
2878 connector()
2879 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002880 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002881
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002882 def test_ssl_cert_verify_error(self):
2883 if support.verbose:
2884 sys.stdout.write("\n")
2885
2886 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2887 server_context.load_cert_chain(SIGNED_CERTFILE)
2888
2889 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2890
2891 server = ThreadedEchoServer(context=server_context, chatty=True)
2892 with server:
2893 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002894 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002895 try:
2896 s.connect((HOST, server.port))
2897 except ssl.SSLError as e:
2898 msg = 'unable to get local issuer certificate'
2899 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2900 self.assertEqual(e.verify_code, 20)
2901 self.assertEqual(e.verify_message, msg)
2902 self.assertIn(msg, repr(e))
2903 self.assertIn('certificate verify failed', repr(e))
2904
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002905 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2906 "OpenSSL is compiled without SSLv2 support")
2907 def test_protocol_sslv2(self):
2908 """Connecting to an SSLv2 server with various client options"""
2909 if support.verbose:
2910 sys.stdout.write("\n")
2911 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2912 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2913 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002914 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002915 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2916 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2917 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2918 # SSLv23 client with specific SSL options
2919 if no_sslv2_implies_sslv3_hello():
2920 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002921 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002922 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002923 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002924 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002925 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002926 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002927
Christian Heimesa170fa12017-09-15 20:27:30 +02002928 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002929 """Connecting to an SSLv23 server with various client options"""
2930 if support.verbose:
2931 sys.stdout.write("\n")
2932 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002933 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002934 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002935 except OSError as x:
2936 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2937 if support.verbose:
2938 sys.stdout.write(
2939 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2940 % str(x))
2941 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002942 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2943 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2944 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002945
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002946 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002947 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2948 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2949 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002950
2951 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002952 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2953 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2954 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002955
2956 # Server with specific SSL options
2957 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002958 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002959 server_options=ssl.OP_NO_SSLv3)
2960 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002961 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002962 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002963 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002964 server_options=ssl.OP_NO_TLSv1)
2965
2966
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002967 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2968 "OpenSSL is compiled without SSLv3 support")
2969 def test_protocol_sslv3(self):
2970 """Connecting to an SSLv3 server with various client options"""
2971 if support.verbose:
2972 sys.stdout.write("\n")
2973 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2974 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2975 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2976 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2977 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002978 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002979 client_options=ssl.OP_NO_SSLv3)
2980 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2981 if no_sslv2_implies_sslv3_hello():
2982 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002983 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002984 False, client_options=ssl.OP_NO_SSLv2)
2985
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002986 def test_protocol_tlsv1(self):
2987 """Connecting to a TLSv1 server with various client options"""
2988 if support.verbose:
2989 sys.stdout.write("\n")
2990 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2991 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2992 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2993 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2994 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2995 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2996 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002997 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002998 client_options=ssl.OP_NO_TLSv1)
2999
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003000 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3001 "TLS version 1.1 not supported.")
3002 def test_protocol_tlsv1_1(self):
3003 """Connecting to a TLSv1.1 server with various client options.
3004 Testing against older TLS versions."""
3005 if support.verbose:
3006 sys.stdout.write("\n")
3007 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3008 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3009 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3010 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3011 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003012 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003013 client_options=ssl.OP_NO_TLSv1_1)
3014
Christian Heimesa170fa12017-09-15 20:27:30 +02003015 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003016 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3017 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3018
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003019 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3020 "TLS version 1.2 not supported.")
3021 def test_protocol_tlsv1_2(self):
3022 """Connecting to a TLSv1.2 server with various client options.
3023 Testing against older TLS versions."""
3024 if support.verbose:
3025 sys.stdout.write("\n")
3026 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3027 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3028 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3029 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3030 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3031 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3032 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003033 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003034 client_options=ssl.OP_NO_TLSv1_2)
3035
Christian Heimesa170fa12017-09-15 20:27:30 +02003036 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003037 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3038 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3039 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3040 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3041
3042 def test_starttls(self):
3043 """Switching from clear text to encrypted and back again."""
3044 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3045
3046 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003047 starttls_server=True,
3048 chatty=True,
3049 connectionchatty=True)
3050 wrapped = False
3051 with server:
3052 s = socket.socket()
3053 s.setblocking(1)
3054 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003055 if support.verbose:
3056 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003057 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003058 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003059 sys.stdout.write(
3060 " client: sending %r...\n" % indata)
3061 if wrapped:
3062 conn.write(indata)
3063 outdata = conn.read()
3064 else:
3065 s.send(indata)
3066 outdata = s.recv(1024)
3067 msg = outdata.strip().lower()
3068 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3069 # STARTTLS ok, switch to secure mode
3070 if support.verbose:
3071 sys.stdout.write(
3072 " client: read %r from server, starting TLS...\n"
3073 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003074 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003075 wrapped = True
3076 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3077 # ENDTLS ok, switch back to clear text
3078 if support.verbose:
3079 sys.stdout.write(
3080 " client: read %r from server, ending TLS...\n"
3081 % msg)
3082 s = conn.unwrap()
3083 wrapped = False
3084 else:
3085 if support.verbose:
3086 sys.stdout.write(
3087 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003088 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 sys.stdout.write(" client: closing connection.\n")
3090 if wrapped:
3091 conn.write(b"over\n")
3092 else:
3093 s.send(b"over\n")
3094 if wrapped:
3095 conn.close()
3096 else:
3097 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003098
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003099 def test_socketserver(self):
3100 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003101 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003102 # try to connect
3103 if support.verbose:
3104 sys.stdout.write('\n')
3105 with open(CERTFILE, 'rb') as f:
3106 d1 = f.read()
3107 d2 = ''
3108 # now fetch the same data from the HTTPS server
3109 url = 'https://localhost:%d/%s' % (
3110 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003111 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003112 f = urllib.request.urlopen(url, context=context)
3113 try:
3114 dlen = f.info().get("content-length")
3115 if dlen and (int(dlen) > 0):
3116 d2 = f.read(int(dlen))
3117 if support.verbose:
3118 sys.stdout.write(
3119 " client: read %d bytes from remote server '%s'\n"
3120 % (len(d2), server))
3121 finally:
3122 f.close()
3123 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003124
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003125 def test_asyncore_server(self):
3126 """Check the example asyncore integration."""
3127 if support.verbose:
3128 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003129
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003130 indata = b"FOO\n"
3131 server = AsyncoreEchoServer(CERTFILE)
3132 with server:
3133 s = test_wrap_socket(socket.socket())
3134 s.connect(('127.0.0.1', server.port))
3135 if support.verbose:
3136 sys.stdout.write(
3137 " client: sending %r...\n" % indata)
3138 s.write(indata)
3139 outdata = s.read()
3140 if support.verbose:
3141 sys.stdout.write(" client: read %r\n" % outdata)
3142 if outdata != indata.lower():
3143 self.fail(
3144 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3145 % (outdata[:20], len(outdata),
3146 indata[:20].lower(), len(indata)))
3147 s.write(b"over\n")
3148 if support.verbose:
3149 sys.stdout.write(" client: closing connection.\n")
3150 s.close()
3151 if support.verbose:
3152 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003153
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003154 def test_recv_send(self):
3155 """Test recv(), send() and friends."""
3156 if support.verbose:
3157 sys.stdout.write("\n")
3158
3159 server = ThreadedEchoServer(CERTFILE,
3160 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003161 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003162 cacerts=CERTFILE,
3163 chatty=True,
3164 connectionchatty=False)
3165 with server:
3166 s = test_wrap_socket(socket.socket(),
3167 server_side=False,
3168 certfile=CERTFILE,
3169 ca_certs=CERTFILE,
3170 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003171 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003172 s.connect((HOST, server.port))
3173 # helper methods for standardising recv* method signatures
3174 def _recv_into():
3175 b = bytearray(b"\0"*100)
3176 count = s.recv_into(b)
3177 return b[:count]
3178
3179 def _recvfrom_into():
3180 b = bytearray(b"\0"*100)
3181 count, addr = s.recvfrom_into(b)
3182 return b[:count]
3183
3184 # (name, method, expect success?, *args, return value func)
3185 send_methods = [
3186 ('send', s.send, True, [], len),
3187 ('sendto', s.sendto, False, ["some.address"], len),
3188 ('sendall', s.sendall, True, [], lambda x: None),
3189 ]
3190 # (name, method, whether to expect success, *args)
3191 recv_methods = [
3192 ('recv', s.recv, True, []),
3193 ('recvfrom', s.recvfrom, False, ["some.address"]),
3194 ('recv_into', _recv_into, True, []),
3195 ('recvfrom_into', _recvfrom_into, False, []),
3196 ]
3197 data_prefix = "PREFIX_"
3198
3199 for (meth_name, send_meth, expect_success, args,
3200 ret_val_meth) in send_methods:
3201 indata = (data_prefix + meth_name).encode('ascii')
3202 try:
3203 ret = send_meth(indata, *args)
3204 msg = "sending with {}".format(meth_name)
3205 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3206 outdata = s.read()
3207 if outdata != indata.lower():
3208 self.fail(
3209 "While sending with <<{name:s}>> bad data "
3210 "<<{outdata:r}>> ({nout:d}) received; "
3211 "expected <<{indata:r}>> ({nin:d})\n".format(
3212 name=meth_name, outdata=outdata[:20],
3213 nout=len(outdata),
3214 indata=indata[:20], nin=len(indata)
3215 )
3216 )
3217 except ValueError as e:
3218 if expect_success:
3219 self.fail(
3220 "Failed to send with method <<{name:s}>>; "
3221 "expected to succeed.\n".format(name=meth_name)
3222 )
3223 if not str(e).startswith(meth_name):
3224 self.fail(
3225 "Method <<{name:s}>> failed with unexpected "
3226 "exception message: {exp:s}\n".format(
3227 name=meth_name, exp=e
3228 )
3229 )
3230
3231 for meth_name, recv_meth, expect_success, args in recv_methods:
3232 indata = (data_prefix + meth_name).encode('ascii')
3233 try:
3234 s.send(indata)
3235 outdata = recv_meth(*args)
3236 if outdata != indata.lower():
3237 self.fail(
3238 "While receiving with <<{name:s}>> bad data "
3239 "<<{outdata:r}>> ({nout:d}) received; "
3240 "expected <<{indata:r}>> ({nin:d})\n".format(
3241 name=meth_name, outdata=outdata[:20],
3242 nout=len(outdata),
3243 indata=indata[:20], nin=len(indata)
3244 )
3245 )
3246 except ValueError as e:
3247 if expect_success:
3248 self.fail(
3249 "Failed to receive with method <<{name:s}>>; "
3250 "expected to succeed.\n".format(name=meth_name)
3251 )
3252 if not str(e).startswith(meth_name):
3253 self.fail(
3254 "Method <<{name:s}>> failed with unexpected "
3255 "exception message: {exp:s}\n".format(
3256 name=meth_name, exp=e
3257 )
3258 )
3259 # consume data
3260 s.read()
3261
3262 # read(-1, buffer) is supported, even though read(-1) is not
3263 data = b"data"
3264 s.send(data)
3265 buffer = bytearray(len(data))
3266 self.assertEqual(s.read(-1, buffer), len(data))
3267 self.assertEqual(buffer, data)
3268
Christian Heimes888bbdc2017-09-07 14:18:21 -07003269 # sendall accepts bytes-like objects
3270 if ctypes is not None:
3271 ubyte = ctypes.c_ubyte * len(data)
3272 byteslike = ubyte.from_buffer_copy(data)
3273 s.sendall(byteslike)
3274 self.assertEqual(s.read(), data)
3275
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003276 # Make sure sendmsg et al are disallowed to avoid
3277 # inadvertent disclosure of data and/or corruption
3278 # of the encrypted data stream
3279 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3280 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3281 self.assertRaises(NotImplementedError,
3282 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003283 s.write(b"over\n")
3284
3285 self.assertRaises(ValueError, s.recv, -1)
3286 self.assertRaises(ValueError, s.read, -1)
3287
3288 s.close()
3289
3290 def test_recv_zero(self):
3291 server = ThreadedEchoServer(CERTFILE)
3292 server.__enter__()
3293 self.addCleanup(server.__exit__, None, None)
3294 s = socket.create_connection((HOST, server.port))
3295 self.addCleanup(s.close)
3296 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3297 self.addCleanup(s.close)
3298
3299 # recv/read(0) should return no data
3300 s.send(b"data")
3301 self.assertEqual(s.recv(0), b"")
3302 self.assertEqual(s.read(0), b"")
3303 self.assertEqual(s.read(), b"data")
3304
3305 # Should not block if the other end sends no data
3306 s.setblocking(False)
3307 self.assertEqual(s.recv(0), b"")
3308 self.assertEqual(s.recv_into(bytearray()), 0)
3309
3310 def test_nonblocking_send(self):
3311 server = ThreadedEchoServer(CERTFILE,
3312 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003313 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003314 cacerts=CERTFILE,
3315 chatty=True,
3316 connectionchatty=False)
3317 with server:
3318 s = test_wrap_socket(socket.socket(),
3319 server_side=False,
3320 certfile=CERTFILE,
3321 ca_certs=CERTFILE,
3322 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003323 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003324 s.connect((HOST, server.port))
3325 s.setblocking(False)
3326
3327 # If we keep sending data, at some point the buffers
3328 # will be full and the call will block
3329 buf = bytearray(8192)
3330 def fill_buffer():
3331 while True:
3332 s.send(buf)
3333 self.assertRaises((ssl.SSLWantWriteError,
3334 ssl.SSLWantReadError), fill_buffer)
3335
3336 # Now read all the output and discard it
3337 s.setblocking(True)
3338 s.close()
3339
3340 def test_handshake_timeout(self):
3341 # Issue #5103: SSL handshake must respect the socket timeout
3342 server = socket.socket(socket.AF_INET)
3343 host = "127.0.0.1"
3344 port = support.bind_port(server)
3345 started = threading.Event()
3346 finish = False
3347
3348 def serve():
3349 server.listen()
3350 started.set()
3351 conns = []
3352 while not finish:
3353 r, w, e = select.select([server], [], [], 0.1)
3354 if server in r:
3355 # Let the socket hang around rather than having
3356 # it closed by garbage collection.
3357 conns.append(server.accept()[0])
3358 for sock in conns:
3359 sock.close()
3360
3361 t = threading.Thread(target=serve)
3362 t.start()
3363 started.wait()
3364
3365 try:
3366 try:
3367 c = socket.socket(socket.AF_INET)
3368 c.settimeout(0.2)
3369 c.connect((host, port))
3370 # Will attempt handshake and time out
3371 self.assertRaisesRegex(socket.timeout, "timed out",
3372 test_wrap_socket, c)
3373 finally:
3374 c.close()
3375 try:
3376 c = socket.socket(socket.AF_INET)
3377 c = test_wrap_socket(c)
3378 c.settimeout(0.2)
3379 # Will attempt handshake and time out
3380 self.assertRaisesRegex(socket.timeout, "timed out",
3381 c.connect, (host, port))
3382 finally:
3383 c.close()
3384 finally:
3385 finish = True
3386 t.join()
3387 server.close()
3388
3389 def test_server_accept(self):
3390 # Issue #16357: accept() on a SSLSocket created through
3391 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003392 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003393 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003394 context.load_verify_locations(SIGNING_CA)
3395 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003396 server = socket.socket(socket.AF_INET)
3397 host = "127.0.0.1"
3398 port = support.bind_port(server)
3399 server = context.wrap_socket(server, server_side=True)
3400 self.assertTrue(server.server_side)
3401
3402 evt = threading.Event()
3403 remote = None
3404 peer = None
3405 def serve():
3406 nonlocal remote, peer
3407 server.listen()
3408 # Block on the accept and wait on the connection to close.
3409 evt.set()
3410 remote, peer = server.accept()
3411 remote.recv(1)
3412
3413 t = threading.Thread(target=serve)
3414 t.start()
3415 # Client wait until server setup and perform a connect.
3416 evt.wait()
3417 client = context.wrap_socket(socket.socket())
3418 client.connect((host, port))
3419 client_addr = client.getsockname()
3420 client.close()
3421 t.join()
3422 remote.close()
3423 server.close()
3424 # Sanity checks.
3425 self.assertIsInstance(remote, ssl.SSLSocket)
3426 self.assertEqual(peer, client_addr)
3427
3428 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003429 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003430 with context.wrap_socket(socket.socket()) as sock:
3431 with self.assertRaises(OSError) as cm:
3432 sock.getpeercert()
3433 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3434
3435 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003436 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003437 with context.wrap_socket(socket.socket()) as sock:
3438 with self.assertRaises(OSError) as cm:
3439 sock.do_handshake()
3440 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3441
3442 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003443 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003444 try:
3445 # Force a set of weak ciphers on our client context
3446 context.set_ciphers("DES")
3447 except ssl.SSLError:
3448 self.skipTest("no DES cipher available")
3449 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003450 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003451 chatty=False) as server:
3452 with context.wrap_socket(socket.socket()) as s:
3453 with self.assertRaises(OSError):
3454 s.connect((HOST, server.port))
3455 self.assertIn("no shared cipher", server.conn_errors[0])
3456
3457 def test_version_basic(self):
3458 """
3459 Basic tests for SSLSocket.version().
3460 More tests are done in the test_protocol_*() methods.
3461 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003462 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3463 context.check_hostname = False
3464 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003465 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003466 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003467 chatty=False) as server:
3468 with context.wrap_socket(socket.socket()) as s:
3469 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003470 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003471 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003472 if ssl.OPENSSL_VERSION_INFO >= (1, 1, 1):
3473 self.assertEqual(s.version(), 'TLSv1.3')
3474 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003475 self.assertEqual(s.version(), 'TLSv1.2')
3476 else: # 0.9.8 to 1.0.1
3477 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003478 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003479 self.assertIs(s.version(), None)
3480
Christian Heimescb5b68a2017-09-07 18:07:00 -07003481 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3482 "test requires TLSv1.3 enabled OpenSSL")
3483 def test_tls1_3(self):
3484 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3485 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003486 context.options |= (
3487 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3488 )
3489 with ThreadedEchoServer(context=context) as server:
3490 with context.wrap_socket(socket.socket()) as s:
3491 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003492 self.assertIn(s.cipher()[0], {
Christian Heimescb5b68a2017-09-07 18:07:00 -07003493 'TLS13-AES-256-GCM-SHA384',
3494 'TLS13-CHACHA20-POLY1305-SHA256',
3495 'TLS13-AES-128-GCM-SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003496 })
3497 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003498
Christian Heimes698dde12018-02-27 11:54:43 +01003499 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3500 "required OpenSSL 1.1.0g")
3501 def test_min_max_version(self):
3502 client_context, server_context, hostname = testing_context()
3503 # client TLSv1.0 to 1.2
3504 client_context.minimum_version = ssl.TLSVersion.TLSv1
3505 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3506 # server only TLSv1.2
3507 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3508 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3509
3510 with ThreadedEchoServer(context=server_context) as server:
3511 with client_context.wrap_socket(socket.socket(),
3512 server_hostname=hostname) as s:
3513 s.connect((HOST, server.port))
3514 self.assertEqual(s.version(), 'TLSv1.2')
3515
3516 # client 1.0 to 1.2, server 1.0 to 1.1
3517 server_context.minimum_version = ssl.TLSVersion.TLSv1
3518 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3519
3520 with ThreadedEchoServer(context=server_context) as server:
3521 with client_context.wrap_socket(socket.socket(),
3522 server_hostname=hostname) as s:
3523 s.connect((HOST, server.port))
3524 self.assertEqual(s.version(), 'TLSv1.1')
3525
3526 # client 1.0, server 1.2 (mismatch)
3527 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3528 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3529 client_context.minimum_version = ssl.TLSVersion.TLSv1
3530 client_context.maximum_version = ssl.TLSVersion.TLSv1
3531 with ThreadedEchoServer(context=server_context) as server:
3532 with client_context.wrap_socket(socket.socket(),
3533 server_hostname=hostname) as s:
3534 with self.assertRaises(ssl.SSLError) as e:
3535 s.connect((HOST, server.port))
3536 self.assertIn("alert", str(e.exception))
3537
3538
3539 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3540 "required OpenSSL 1.1.0g")
3541 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3542 def test_min_max_version_sslv3(self):
3543 client_context, server_context, hostname = testing_context()
3544 server_context.minimum_version = ssl.TLSVersion.SSLv3
3545 client_context.minimum_version = ssl.TLSVersion.SSLv3
3546 client_context.maximum_version = ssl.TLSVersion.SSLv3
3547 with ThreadedEchoServer(context=server_context) as server:
3548 with client_context.wrap_socket(socket.socket(),
3549 server_hostname=hostname) as s:
3550 s.connect((HOST, server.port))
3551 self.assertEqual(s.version(), 'SSLv3')
3552
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003553 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3554 def test_default_ecdh_curve(self):
3555 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3556 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003557 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003558 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003559 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3560 # cipher name.
3561 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003562 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3563 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3564 # our default cipher list should prefer ECDH-based ciphers
3565 # automatically.
3566 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3567 context.set_ciphers("ECCdraft:ECDH")
3568 with ThreadedEchoServer(context=context) as server:
3569 with context.wrap_socket(socket.socket()) as s:
3570 s.connect((HOST, server.port))
3571 self.assertIn("ECDH", s.cipher()[0])
3572
3573 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3574 "'tls-unique' channel binding not available")
3575 def test_tls_unique_channel_binding(self):
3576 """Test tls-unique channel binding."""
3577 if support.verbose:
3578 sys.stdout.write("\n")
3579
Christian Heimes05d9fe32018-02-27 08:55:39 +01003580 client_context, server_context, hostname = testing_context()
3581 # TODO: fix TLSv1.3 support
3582 client_context.options |= ssl.OP_NO_TLSv1_3
3583
3584 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003585 chatty=True,
3586 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003587
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003588 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003589 with client_context.wrap_socket(
3590 socket.socket(),
3591 server_hostname=hostname) as s:
3592 s.connect((HOST, server.port))
3593 # get the data
3594 cb_data = s.get_channel_binding("tls-unique")
3595 if support.verbose:
3596 sys.stdout.write(
3597 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003598
Christian Heimes05d9fe32018-02-27 08:55:39 +01003599 # check if it is sane
3600 self.assertIsNotNone(cb_data)
3601 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003602
Christian Heimes05d9fe32018-02-27 08:55:39 +01003603 # and compare with the peers version
3604 s.write(b"CB tls-unique\n")
3605 peer_data_repr = s.read().strip()
3606 self.assertEqual(peer_data_repr,
3607 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003608
3609 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003610 with client_context.wrap_socket(
3611 socket.socket(),
3612 server_hostname=hostname) as s:
3613 s.connect((HOST, server.port))
3614 new_cb_data = s.get_channel_binding("tls-unique")
3615 if support.verbose:
3616 sys.stdout.write(
3617 "got another channel binding data: {0!r}\n".format(
3618 new_cb_data)
3619 )
3620 # is it really unique
3621 self.assertNotEqual(cb_data, new_cb_data)
3622 self.assertIsNotNone(cb_data)
3623 self.assertEqual(len(cb_data), 12) # True for TLSv1
3624 s.write(b"CB tls-unique\n")
3625 peer_data_repr = s.read().strip()
3626 self.assertEqual(peer_data_repr,
3627 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003628
3629 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003630 client_context, server_context, hostname = testing_context()
3631 stats = server_params_test(client_context, server_context,
3632 chatty=True, connectionchatty=True,
3633 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003634 if support.verbose:
3635 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3636 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3637
3638 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3639 "ssl.OP_NO_COMPRESSION needed for this test")
3640 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003641 client_context, server_context, hostname = testing_context()
3642 client_context.options |= ssl.OP_NO_COMPRESSION
3643 server_context.options |= ssl.OP_NO_COMPRESSION
3644 stats = server_params_test(client_context, server_context,
3645 chatty=True, connectionchatty=True,
3646 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003647 self.assertIs(stats['compression'], None)
3648
3649 def test_dh_params(self):
3650 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003651 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003652 # test scenario needs TLS <= 1.2
3653 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003654 server_context.load_dh_params(DHFILE)
3655 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003656 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003657 stats = server_params_test(client_context, server_context,
3658 chatty=True, connectionchatty=True,
3659 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003660 cipher = stats["cipher"][0]
3661 parts = cipher.split("-")
3662 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3663 self.fail("Non-DH cipher: " + cipher[0])
3664
Christian Heimesb7b92252018-02-25 09:49:31 +01003665 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003666 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003667 def test_ecdh_curve(self):
3668 # server secp384r1, client auto
3669 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003670
Christian Heimesb7b92252018-02-25 09:49:31 +01003671 server_context.set_ecdh_curve("secp384r1")
3672 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3673 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3674 stats = server_params_test(client_context, server_context,
3675 chatty=True, connectionchatty=True,
3676 sni_name=hostname)
3677
3678 # server auto, client secp384r1
3679 client_context, server_context, hostname = testing_context()
3680 client_context.set_ecdh_curve("secp384r1")
3681 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3682 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3683 stats = server_params_test(client_context, server_context,
3684 chatty=True, connectionchatty=True,
3685 sni_name=hostname)
3686
3687 # server / client curve mismatch
3688 client_context, server_context, hostname = testing_context()
3689 client_context.set_ecdh_curve("prime256v1")
3690 server_context.set_ecdh_curve("secp384r1")
3691 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3692 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3693 try:
3694 stats = server_params_test(client_context, server_context,
3695 chatty=True, connectionchatty=True,
3696 sni_name=hostname)
3697 except ssl.SSLError:
3698 pass
3699 else:
3700 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003701 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003702 self.fail("mismatch curve did not fail")
3703
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003704 def test_selected_alpn_protocol(self):
3705 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003706 client_context, server_context, hostname = testing_context()
3707 stats = server_params_test(client_context, server_context,
3708 chatty=True, connectionchatty=True,
3709 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003710 self.assertIs(stats['client_alpn_protocol'], None)
3711
3712 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3713 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3714 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003715 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003716 server_context.set_alpn_protocols(['foo', 'bar'])
3717 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003718 chatty=True, connectionchatty=True,
3719 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003720 self.assertIs(stats['client_alpn_protocol'], None)
3721
3722 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3723 def test_alpn_protocols(self):
3724 server_protocols = ['foo', 'bar', 'milkshake']
3725 protocol_tests = [
3726 (['foo', 'bar'], 'foo'),
3727 (['bar', 'foo'], 'foo'),
3728 (['milkshake'], 'milkshake'),
3729 (['http/3.0', 'http/4.0'], None)
3730 ]
3731 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003732 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003733 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003734 client_context.set_alpn_protocols(client_protocols)
3735
3736 try:
3737 stats = server_params_test(client_context,
3738 server_context,
3739 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003740 connectionchatty=True,
3741 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003742 except ssl.SSLError as e:
3743 stats = e
3744
Christian Heimes05d9fe32018-02-27 08:55:39 +01003745 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003746 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3747 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3748 self.assertIsInstance(stats, ssl.SSLError)
3749 else:
3750 msg = "failed trying %s (s) and %s (c).\n" \
3751 "was expecting %s, but got %%s from the %%s" \
3752 % (str(server_protocols), str(client_protocols),
3753 str(expected))
3754 client_result = stats['client_alpn_protocol']
3755 self.assertEqual(client_result, expected,
3756 msg % (client_result, "client"))
3757 server_result = stats['server_alpn_protocols'][-1] \
3758 if len(stats['server_alpn_protocols']) else 'nothing'
3759 self.assertEqual(server_result, expected,
3760 msg % (server_result, "server"))
3761
3762 def test_selected_npn_protocol(self):
3763 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003764 client_context, server_context, hostname = testing_context()
3765 stats = server_params_test(client_context, server_context,
3766 chatty=True, connectionchatty=True,
3767 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003768 self.assertIs(stats['client_npn_protocol'], None)
3769
3770 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3771 def test_npn_protocols(self):
3772 server_protocols = ['http/1.1', 'spdy/2']
3773 protocol_tests = [
3774 (['http/1.1', 'spdy/2'], 'http/1.1'),
3775 (['spdy/2', 'http/1.1'], 'http/1.1'),
3776 (['spdy/2', 'test'], 'spdy/2'),
3777 (['abc', 'def'], 'abc')
3778 ]
3779 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003780 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003781 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003782 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003783 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003784 chatty=True, connectionchatty=True,
3785 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003786 msg = "failed trying %s (s) and %s (c).\n" \
3787 "was expecting %s, but got %%s from the %%s" \
3788 % (str(server_protocols), str(client_protocols),
3789 str(expected))
3790 client_result = stats['client_npn_protocol']
3791 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3792 server_result = stats['server_npn_protocols'][-1] \
3793 if len(stats['server_npn_protocols']) else 'nothing'
3794 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3795
3796 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003797 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003798 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003799 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003800 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003801 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003802 client_context.load_verify_locations(SIGNING_CA)
3803 return server_context, other_context, client_context
3804
3805 def check_common_name(self, stats, name):
3806 cert = stats['peercert']
3807 self.assertIn((('commonName', name),), cert['subject'])
3808
3809 @needs_sni
3810 def test_sni_callback(self):
3811 calls = []
3812 server_context, other_context, client_context = self.sni_contexts()
3813
Christian Heimesa170fa12017-09-15 20:27:30 +02003814 client_context.check_hostname = False
3815
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003816 def servername_cb(ssl_sock, server_name, initial_context):
3817 calls.append((server_name, initial_context))
3818 if server_name is not None:
3819 ssl_sock.context = other_context
3820 server_context.set_servername_callback(servername_cb)
3821
3822 stats = server_params_test(client_context, server_context,
3823 chatty=True,
3824 sni_name='supermessage')
3825 # The hostname was fetched properly, and the certificate was
3826 # changed for the connection.
3827 self.assertEqual(calls, [("supermessage", server_context)])
3828 # CERTFILE4 was selected
3829 self.check_common_name(stats, 'fakehostname')
3830
3831 calls = []
3832 # The callback is called with server_name=None
3833 stats = server_params_test(client_context, server_context,
3834 chatty=True,
3835 sni_name=None)
3836 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003837 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003838
3839 # Check disabling the callback
3840 calls = []
3841 server_context.set_servername_callback(None)
3842
3843 stats = server_params_test(client_context, server_context,
3844 chatty=True,
3845 sni_name='notfunny')
3846 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003847 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003848 self.assertEqual(calls, [])
3849
3850 @needs_sni
3851 def test_sni_callback_alert(self):
3852 # Returning a TLS alert is reflected to the connecting client
3853 server_context, other_context, client_context = self.sni_contexts()
3854
3855 def cb_returning_alert(ssl_sock, server_name, initial_context):
3856 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3857 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003858 with self.assertRaises(ssl.SSLError) as cm:
3859 stats = server_params_test(client_context, server_context,
3860 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003861 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003862 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003863
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003864 @needs_sni
3865 def test_sni_callback_raising(self):
3866 # Raising fails the connection with a TLS handshake failure alert.
3867 server_context, other_context, client_context = self.sni_contexts()
3868
3869 def cb_raising(ssl_sock, server_name, initial_context):
3870 1/0
3871 server_context.set_servername_callback(cb_raising)
3872
3873 with self.assertRaises(ssl.SSLError) as cm, \
3874 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003875 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003876 chatty=False,
3877 sni_name='supermessage')
3878 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3879 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003880
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003881 @needs_sni
3882 def test_sni_callback_wrong_return_type(self):
3883 # Returning the wrong return type terminates the TLS connection
3884 # with an internal error alert.
3885 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003886
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003887 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3888 return "foo"
3889 server_context.set_servername_callback(cb_wrong_return_type)
3890
3891 with self.assertRaises(ssl.SSLError) as cm, \
3892 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003893 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003894 chatty=False,
3895 sni_name='supermessage')
3896 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3897 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003898
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003899 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003900 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003901 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3902 client_context.set_ciphers("AES128:AES256")
3903 server_context.set_ciphers("AES256")
3904 alg1 = "AES256"
3905 alg2 = "AES-256"
3906 else:
3907 client_context.set_ciphers("AES:3DES")
3908 server_context.set_ciphers("3DES")
3909 alg1 = "3DES"
3910 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003911
Christian Heimesa170fa12017-09-15 20:27:30 +02003912 stats = server_params_test(client_context, server_context,
3913 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003914 ciphers = stats['server_shared_ciphers'][0]
3915 self.assertGreater(len(ciphers), 0)
3916 for name, tls_version, bits in ciphers:
3917 if not alg1 in name.split("-") and alg2 not in name:
3918 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003919
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003920 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003921 client_context, server_context, hostname = testing_context()
3922 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003923
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003924 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003925 s = client_context.wrap_socket(socket.socket(),
3926 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 s.connect((HOST, server.port))
3928 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003929
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003930 self.assertRaises(ValueError, s.read, 1024)
3931 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003932
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003933 def test_sendfile(self):
3934 TEST_DATA = b"x" * 512
3935 with open(support.TESTFN, 'wb') as f:
3936 f.write(TEST_DATA)
3937 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003938 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003940 context.load_verify_locations(SIGNING_CA)
3941 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003942 server = ThreadedEchoServer(context=context, chatty=False)
3943 with server:
3944 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003945 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003946 with open(support.TESTFN, 'rb') as file:
3947 s.sendfile(file)
3948 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003949
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003950 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003951 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003952 # TODO: sessions aren't compatible with TLSv1.3 yet
3953 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003954
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003955 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003956 stats = server_params_test(client_context, server_context,
3957 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003958 session = stats['session']
3959 self.assertTrue(session.id)
3960 self.assertGreater(session.time, 0)
3961 self.assertGreater(session.timeout, 0)
3962 self.assertTrue(session.has_ticket)
3963 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3964 self.assertGreater(session.ticket_lifetime_hint, 0)
3965 self.assertFalse(stats['session_reused'])
3966 sess_stat = server_context.session_stats()
3967 self.assertEqual(sess_stat['accept'], 1)
3968 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003969
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003970 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003971 stats = server_params_test(client_context, server_context,
3972 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003973 sess_stat = server_context.session_stats()
3974 self.assertEqual(sess_stat['accept'], 2)
3975 self.assertEqual(sess_stat['hits'], 1)
3976 self.assertTrue(stats['session_reused'])
3977 session2 = stats['session']
3978 self.assertEqual(session2.id, session.id)
3979 self.assertEqual(session2, session)
3980 self.assertIsNot(session2, session)
3981 self.assertGreaterEqual(session2.time, session.time)
3982 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003983
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003984 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003985 stats = server_params_test(client_context, server_context,
3986 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003987 self.assertFalse(stats['session_reused'])
3988 session3 = stats['session']
3989 self.assertNotEqual(session3.id, session.id)
3990 self.assertNotEqual(session3, session)
3991 sess_stat = server_context.session_stats()
3992 self.assertEqual(sess_stat['accept'], 3)
3993 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003994
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003995 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003996 stats = server_params_test(client_context, server_context,
3997 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003998 self.assertTrue(stats['session_reused'])
3999 session4 = stats['session']
4000 self.assertEqual(session4.id, session.id)
4001 self.assertEqual(session4, session)
4002 self.assertGreaterEqual(session4.time, session.time)
4003 self.assertGreaterEqual(session4.timeout, session.timeout)
4004 sess_stat = server_context.session_stats()
4005 self.assertEqual(sess_stat['accept'], 4)
4006 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004007
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004008 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004009 client_context, server_context, hostname = testing_context()
4010 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004011
Christian Heimes05d9fe32018-02-27 08:55:39 +01004012 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004013 client_context.options |= ssl.OP_NO_TLSv1_3
4014 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004015
Christian Heimesa170fa12017-09-15 20:27:30 +02004016 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004018 with client_context.wrap_socket(socket.socket(),
4019 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004020 # session is None before handshake
4021 self.assertEqual(s.session, None)
4022 self.assertEqual(s.session_reused, None)
4023 s.connect((HOST, server.port))
4024 session = s.session
4025 self.assertTrue(session)
4026 with self.assertRaises(TypeError) as e:
4027 s.session = object
4028 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004029
Christian Heimesa170fa12017-09-15 20:27:30 +02004030 with client_context.wrap_socket(socket.socket(),
4031 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004032 s.connect((HOST, server.port))
4033 # cannot set session after handshake
4034 with self.assertRaises(ValueError) as e:
4035 s.session = session
4036 self.assertEqual(str(e.exception),
4037 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004038
Christian Heimesa170fa12017-09-15 20:27:30 +02004039 with client_context.wrap_socket(socket.socket(),
4040 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004041 # can set session before handshake and before the
4042 # connection was established
4043 s.session = session
4044 s.connect((HOST, server.port))
4045 self.assertEqual(s.session.id, session.id)
4046 self.assertEqual(s.session, session)
4047 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004048
Christian Heimesa170fa12017-09-15 20:27:30 +02004049 with client_context2.wrap_socket(socket.socket(),
4050 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004051 # cannot re-use session with a different SSLContext
4052 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004053 s.session = session
4054 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004055 self.assertEqual(str(e.exception),
4056 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004057
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004058
Thomas Woutersed03b412007-08-28 21:37:11 +00004059def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004060 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004061 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004062 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004063 'Mac': platform.mac_ver,
4064 'Windows': platform.win32_ver,
4065 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004066 for name, func in plats.items():
4067 plat = func()
4068 if plat and plat[0]:
4069 plat = '%s %r' % (name, plat)
4070 break
4071 else:
4072 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004073 print("test_ssl: testing with %r %r" %
4074 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4075 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004076 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004077 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4078 try:
4079 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4080 except AttributeError:
4081 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004082
Antoine Pitrou152efa22010-05-16 18:19:27 +00004083 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004084 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004085 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004086 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004087 BADCERT, BADKEY, EMPTYCERT]:
4088 if not os.path.exists(filename):
4089 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004090
Martin Panter3840b2a2016-03-27 01:53:46 +00004091 tests = [
4092 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004093 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004094 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004095
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004096 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004097 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004098
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004099 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004100 try:
4101 support.run_unittest(*tests)
4102 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004103 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004104
4105if __name__ == "__main__":
4106 test_main()