blob: d80e8d3fcfc6e6e40ca34a719527eb6b1bd623a2 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080058WRONG_CERT = data_file("wrongcert.pem")
Christian Heimesefff7062013-11-21 03:35:02 +010059
Christian Heimesbd5c7d22018-01-20 15:16:30 +010060CERTFILE_INFO = {
61 'issuer': ((('countryName', 'XY'),),
62 (('localityName', 'Castle Anthrax'),),
63 (('organizationName', 'Python Software Foundation'),),
64 (('commonName', 'localhost'),)),
65 'notAfter': 'Jan 17 19:09:06 2028 GMT',
66 'notBefore': 'Jan 19 19:09:06 2018 GMT',
67 'serialNumber': 'F9BA076D5B6ABD9B',
68 'subject': ((('countryName', 'XY'),),
69 (('localityName', 'Castle Anthrax'),),
70 (('organizationName', 'Python Software Foundation'),),
71 (('commonName', 'localhost'),)),
72 'subjectAltName': (('DNS', 'localhost'),),
73 'version': 3
74}
Antoine Pitrou152efa22010-05-16 18:19:27 +000075
Christian Heimes22587792013-11-21 23:56:13 +010076# empty CRL
77CRLFILE = data_file("revocation.crl")
78
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010079# Two keys and certs signed by the same CA (for SNI tests)
80SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020081SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082
83SIGNED_CERTFILE_INFO = {
84 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
85 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
86 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
87 'issuer': ((('countryName', 'XY'),),
88 (('organizationName', 'Python Software Foundation CA'),),
89 (('commonName', 'our-ca-server'),)),
90 'notAfter': 'Nov 28 19:09:06 2027 GMT',
91 'notBefore': 'Jan 19 19:09:06 2018 GMT',
92 'serialNumber': '82EDBF41C880919C',
93 'subject': ((('countryName', 'XY'),),
94 (('localityName', 'Castle Anthrax'),),
95 (('organizationName', 'Python Software Foundation'),),
96 (('commonName', 'localhost'),)),
97 'subjectAltName': (('DNS', 'localhost'),),
98 'version': 3
99}
100
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100101SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200102SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100103SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
104SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
105
Martin Panter3840b2a2016-03-27 01:53:46 +0000106# Same certificate as pycacert.pem, but without extra text in file
107SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200108# cert with all kinds of subject alt names
109ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100110IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100111
Martin Panter3d81d932016-01-14 09:36:00 +0000112REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000113
114EMPTYCERT = data_file("nullcert.pem")
115BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000116NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000117BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200118NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200119NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400121DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
185def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
187 @functools.wraps(func)
188 def f(*args, **kwargs):
189 try:
190 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
191 except ssl.SSLError:
192 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
193 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
194 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
195 return func(*args, **kwargs)
196 return f
197 else:
198 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100200needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
201
Antoine Pitrou23df4832010-08-04 17:14:06 +0000202
Christian Heimesd0486372016-09-10 23:23:33 +0200203def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
204 cert_reqs=ssl.CERT_NONE, ca_certs=None,
205 ciphers=None, certfile=None, keyfile=None,
206 **kwargs):
207 context = ssl.SSLContext(ssl_version)
208 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200209 if cert_reqs == ssl.CERT_NONE:
210 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200211 context.verify_mode = cert_reqs
212 if ca_certs is not None:
213 context.load_verify_locations(ca_certs)
214 if certfile is not None or keyfile is not None:
215 context.load_cert_chain(certfile, keyfile)
216 if ciphers is not None:
217 context.set_ciphers(ciphers)
218 return context.wrap_socket(sock, **kwargs)
219
Christian Heimesa170fa12017-09-15 20:27:30 +0200220
221def testing_context(server_cert=SIGNED_CERTFILE):
222 """Create context
223
224 client_context, server_context, hostname = testing_context()
225 """
226 if server_cert == SIGNED_CERTFILE:
227 hostname = SIGNED_CERTFILE_HOSTNAME
228 elif server_cert == SIGNED_CERTFILE2:
229 hostname = SIGNED_CERTFILE2_HOSTNAME
230 else:
231 raise ValueError(server_cert)
232
233 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
234 client_context.load_verify_locations(SIGNING_CA)
235
236 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
237 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800238 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200239
240 return client_context, server_context, hostname
241
242
Antoine Pitrou152efa22010-05-16 18:19:27 +0000243class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000244
Antoine Pitrou480a1242010-04-28 21:37:09 +0000245 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000246 ssl.CERT_NONE
247 ssl.CERT_OPTIONAL
248 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100249 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100250 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100251 if ssl.HAS_ECDH:
252 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100253 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
254 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000255 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100256 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700257 ssl.OP_NO_SSLv2
258 ssl.OP_NO_SSLv3
259 ssl.OP_NO_TLSv1
260 ssl.OP_NO_TLSv1_3
261 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
262 ssl.OP_NO_TLSv1_1
263 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200264 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000265
Christian Heimes89c20512018-02-27 11:17:32 +0100266 def test_private_init(self):
267 with self.assertRaisesRegex(TypeError, "public constructor"):
268 with socket.socket() as s:
269 ssl.SSLSocket(s)
270
Antoine Pitrou172f0252014-04-18 20:33:08 +0200271 def test_str_for_enums(self):
272 # Make sure that the PROTOCOL_* constants have enum-like string
273 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200274 proto = ssl.PROTOCOL_TLS
275 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200276 ctx = ssl.SSLContext(proto)
277 self.assertIs(ctx.protocol, proto)
278
Antoine Pitrou480a1242010-04-28 21:37:09 +0000279 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000282 sys.stdout.write("\n RAND_status is %d (%s)\n"
283 % (v, (v and "sufficient randomness") or
284 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200285
286 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
287 self.assertEqual(len(data), 16)
288 self.assertEqual(is_cryptographic, v == 1)
289 if v:
290 data = ssl.RAND_bytes(16)
291 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200292 else:
293 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200294
Victor Stinner1e81a392013-12-19 16:47:04 +0100295 # negative num is invalid
296 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
297 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
298
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100299 if hasattr(ssl, 'RAND_egd'):
300 self.assertRaises(TypeError, ssl.RAND_egd, 1)
301 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000302 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200303 ssl.RAND_add(b"this is a random bytes object", 75.0)
304 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000305
Christian Heimesf77b4b22013-08-21 13:26:05 +0200306 @unittest.skipUnless(os.name == 'posix', 'requires posix')
307 def test_random_fork(self):
308 status = ssl.RAND_status()
309 if not status:
310 self.fail("OpenSSL's PRNG has insufficient randomness")
311
312 rfd, wfd = os.pipe()
313 pid = os.fork()
314 if pid == 0:
315 try:
316 os.close(rfd)
317 child_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(child_random), 16)
319 os.write(wfd, child_random)
320 os.close(wfd)
321 except BaseException:
322 os._exit(1)
323 else:
324 os._exit(0)
325 else:
326 os.close(wfd)
327 self.addCleanup(os.close, rfd)
328 _, status = os.waitpid(pid, 0)
329 self.assertEqual(status, 0)
330
331 child_random = os.read(rfd, 16)
332 self.assertEqual(len(child_random), 16)
333 parent_random = ssl.RAND_pseudo_bytes(16)[0]
334 self.assertEqual(len(parent_random), 16)
335
336 self.assertNotEqual(child_random, parent_random)
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimes824f7f32013-08-17 00:54:47 +0200367 def test_parse_cert_CVE_2013_4238(self):
368 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 subject = ((('countryName', 'US'),),
372 (('stateOrProvinceName', 'Oregon'),),
373 (('localityName', 'Beaverton'),),
374 (('organizationName', 'Python Software Foundation'),),
375 (('organizationalUnitName', 'Python Core Development'),),
376 (('commonName', 'null.python.org\x00example.org'),),
377 (('emailAddress', 'python-dev@python.org'),))
378 self.assertEqual(p['subject'], subject)
379 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200380 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
381 san = (('DNS', 'altnull.python.org\x00example.com'),
382 ('email', 'null@python.org\x00user@example.org'),
383 ('URI', 'http://null.python.org\x00http://example.org'),
384 ('IP Address', '192.0.2.1'),
385 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
386 else:
387 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
388 san = (('DNS', 'altnull.python.org\x00example.com'),
389 ('email', 'null@python.org\x00user@example.org'),
390 ('URI', 'http://null.python.org\x00http://example.org'),
391 ('IP Address', '192.0.2.1'),
392 ('IP Address', '<invalid>'))
393
394 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200395
Christian Heimes1c03abd2016-09-06 23:25:35 +0200396 def test_parse_all_sans(self):
397 p = ssl._ssl._test_decode_cert(ALLSANFILE)
398 self.assertEqual(p['subjectAltName'],
399 (
400 ('DNS', 'allsans'),
401 ('othername', '<unsupported>'),
402 ('othername', '<unsupported>'),
403 ('email', 'user@example.org'),
404 ('DNS', 'www.example.org'),
405 ('DirName',
406 ((('countryName', 'XY'),),
407 (('localityName', 'Castle Anthrax'),),
408 (('organizationName', 'Python Software Foundation'),),
409 (('commonName', 'dirname example'),))),
410 ('URI', 'https://www.python.org/'),
411 ('IP Address', '127.0.0.1'),
412 ('IP Address', '0:0:0:0:0:0:0:1\n'),
413 ('Registered ID', '1.2.3.4.5')
414 )
415 )
416
Antoine Pitrou480a1242010-04-28 21:37:09 +0000417 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000418 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000419 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000420 d1 = ssl.PEM_cert_to_DER_cert(pem)
421 p2 = ssl.DER_cert_to_PEM_cert(d1)
422 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000423 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000424 if not p2.startswith(ssl.PEM_HEADER + '\n'):
425 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
426 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
427 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000428
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000429 def test_openssl_version(self):
430 n = ssl.OPENSSL_VERSION_NUMBER
431 t = ssl.OPENSSL_VERSION_INFO
432 s = ssl.OPENSSL_VERSION
433 self.assertIsInstance(n, int)
434 self.assertIsInstance(t, tuple)
435 self.assertIsInstance(s, str)
436 # Some sanity checks follow
437 # >= 0.9
438 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400439 # < 3.0
440 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000441 major, minor, fix, patch, status = t
442 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400443 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000444 self.assertGreaterEqual(minor, 0)
445 self.assertLess(minor, 256)
446 self.assertGreaterEqual(fix, 0)
447 self.assertLess(fix, 256)
448 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100449 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 self.assertGreaterEqual(status, 0)
451 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400452 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200453 if IS_LIBRESSL:
454 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100455 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400456 else:
457 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100458 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000459
Antoine Pitrou9d543662010-04-23 23:10:32 +0000460 @support.cpython_only
461 def test_refcycle(self):
462 # Issue #7943: an SSL object doesn't create reference cycles with
463 # itself.
464 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200465 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000466 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100467 with support.check_warnings(("", ResourceWarning)):
468 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100469 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000470
Antoine Pitroua468adc2010-09-14 14:43:44 +0000471 def test_wrapped_unconnected(self):
472 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200473 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000474 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200475 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100476 self.assertRaises(OSError, ss.recv, 1)
477 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
478 self.assertRaises(OSError, ss.recvfrom, 1)
479 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
480 self.assertRaises(OSError, ss.send, b'x')
481 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800482 self.assertRaises(NotImplementedError, ss.sendmsg,
483 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000484
Antoine Pitrou40f08742010-04-24 22:04:40 +0000485 def test_timeout(self):
486 # Issue #8524: when creating an SSL socket, the timeout of the
487 # original socket should be retained.
488 for timeout in (None, 0.0, 5.0):
489 s = socket.socket(socket.AF_INET)
490 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200491 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100492 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000493
Christian Heimesd0486372016-09-10 23:23:33 +0200494 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000495 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000496 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000497 "certfile must be specified",
498 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000499 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000500 "certfile must be specified for server-side operations",
501 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000502 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000503 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200504 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100505 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
506 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200507 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200508 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000509 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000510 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000511 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200512 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000513 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000514 ssl.wrap_socket(sock,
515 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000516 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200517 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000518 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000519 ssl.wrap_socket(sock,
520 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000521 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000522
Martin Panter3464ea22016-02-01 21:58:11 +0000523 def bad_cert_test(self, certfile):
524 """Check that trying to use the given client certificate fails"""
525 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
526 certfile)
527 sock = socket.socket()
528 self.addCleanup(sock.close)
529 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200530 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200531 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000532
533 def test_empty_cert(self):
534 """Wrapping with an empty cert file"""
535 self.bad_cert_test("nullcert.pem")
536
537 def test_malformed_cert(self):
538 """Wrapping with a badly formatted certificate (syntax error)"""
539 self.bad_cert_test("badcert.pem")
540
541 def test_malformed_key(self):
542 """Wrapping with a badly formatted key (syntax error)"""
543 self.bad_cert_test("badkey.pem")
544
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 def test_match_hostname(self):
546 def ok(cert, hostname):
547 ssl.match_hostname(cert, hostname)
548 def fail(cert, hostname):
549 self.assertRaises(ssl.CertificateError,
550 ssl.match_hostname, cert, hostname)
551
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100552 # -- Hostname matching --
553
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000554 cert = {'subject': ((('commonName', 'example.com'),),)}
555 ok(cert, 'example.com')
556 ok(cert, 'ExAmple.cOm')
557 fail(cert, 'www.example.com')
558 fail(cert, '.example.com')
559 fail(cert, 'example.org')
560 fail(cert, 'exampleXcom')
561
562 cert = {'subject': ((('commonName', '*.a.com'),),)}
563 ok(cert, 'foo.a.com')
564 fail(cert, 'bar.foo.a.com')
565 fail(cert, 'a.com')
566 fail(cert, 'Xa.com')
567 fail(cert, '.a.com')
568
Mandeep Singhede2ac92017-11-27 04:01:27 +0530569 # only match wildcards when they are the only thing
570 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000571 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530572 fail(cert, 'foo.com')
573 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000574 fail(cert, 'bar.com')
575 fail(cert, 'foo.a.com')
576 fail(cert, 'bar.foo.com')
577
Christian Heimes824f7f32013-08-17 00:54:47 +0200578 # NULL bytes are bad, CVE-2013-4073
579 cert = {'subject': ((('commonName',
580 'null.python.org\x00example.org'),),)}
581 ok(cert, 'null.python.org\x00example.org') # or raise an error?
582 fail(cert, 'example.org')
583 fail(cert, 'null.python.org')
584
Georg Brandl72c98d32013-10-27 07:16:53 +0100585 # error cases with wildcards
586 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
587 fail(cert, 'bar.foo.a.com')
588 fail(cert, 'a.com')
589 fail(cert, 'Xa.com')
590 fail(cert, '.a.com')
591
592 cert = {'subject': ((('commonName', 'a.*.com'),),)}
593 fail(cert, 'a.foo.com')
594 fail(cert, 'a..com')
595 fail(cert, 'a.com')
596
597 # wildcard doesn't match IDNA prefix 'xn--'
598 idna = 'püthon.python.org'.encode("idna").decode("ascii")
599 cert = {'subject': ((('commonName', idna),),)}
600 ok(cert, idna)
601 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
602 fail(cert, idna)
603 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
604 fail(cert, idna)
605
606 # wildcard in first fragment and IDNA A-labels in sequent fragments
607 # are supported.
608 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
609 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530610 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
611 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100612 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
613 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
614
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000615 # Slightly fake real-world example
616 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
617 'subject': ((('commonName', 'linuxfrz.org'),),),
618 'subjectAltName': (('DNS', 'linuxfr.org'),
619 ('DNS', 'linuxfr.com'),
620 ('othername', '<unsupported>'))}
621 ok(cert, 'linuxfr.org')
622 ok(cert, 'linuxfr.com')
623 # Not a "DNS" entry
624 fail(cert, '<unsupported>')
625 # When there is a subjectAltName, commonName isn't used
626 fail(cert, 'linuxfrz.org')
627
628 # A pristine real-world example
629 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
630 'subject': ((('countryName', 'US'),),
631 (('stateOrProvinceName', 'California'),),
632 (('localityName', 'Mountain View'),),
633 (('organizationName', 'Google Inc'),),
634 (('commonName', 'mail.google.com'),))}
635 ok(cert, 'mail.google.com')
636 fail(cert, 'gmail.com')
637 # Only commonName is considered
638 fail(cert, 'California')
639
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100640 # -- IPv4 matching --
641 cert = {'subject': ((('commonName', 'example.com'),),),
642 'subjectAltName': (('DNS', 'example.com'),
643 ('IP Address', '10.11.12.13'),
644 ('IP Address', '14.15.16.17'))}
645 ok(cert, '10.11.12.13')
646 ok(cert, '14.15.16.17')
647 fail(cert, '14.15.16.18')
648 fail(cert, 'example.net')
649
650 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800651 if hasattr(socket, 'AF_INET6'):
652 cert = {'subject': ((('commonName', 'example.com'),),),
653 'subjectAltName': (
654 ('DNS', 'example.com'),
655 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
656 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
657 ok(cert, '2001::cafe')
658 ok(cert, '2003::baba')
659 fail(cert, '2003::bebe')
660 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100661
662 # -- Miscellaneous --
663
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000664 # Neither commonName nor subjectAltName
665 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
666 'subject': ((('countryName', 'US'),),
667 (('stateOrProvinceName', 'California'),),
668 (('localityName', 'Mountain View'),),
669 (('organizationName', 'Google Inc'),))}
670 fail(cert, 'mail.google.com')
671
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200672 # No DNS entry in subjectAltName but a commonName
673 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
674 'subject': ((('countryName', 'US'),),
675 (('stateOrProvinceName', 'California'),),
676 (('localityName', 'Mountain View'),),
677 (('commonName', 'mail.google.com'),)),
678 'subjectAltName': (('othername', 'blabla'), )}
679 ok(cert, 'mail.google.com')
680
681 # No DNS entry subjectAltName and no commonName
682 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
683 'subject': ((('countryName', 'US'),),
684 (('stateOrProvinceName', 'California'),),
685 (('localityName', 'Mountain View'),),
686 (('organizationName', 'Google Inc'),)),
687 'subjectAltName': (('othername', 'blabla'),)}
688 fail(cert, 'google.com')
689
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000690 # Empty cert / no cert
691 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
692 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
693
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200694 # Issue #17980: avoid denials of service by refusing more than one
695 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800696 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
697 with self.assertRaisesRegex(
698 ssl.CertificateError,
699 "partial wildcards in leftmost label are not supported"):
700 ssl.match_hostname(cert, 'axxb.example.com')
701
702 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
703 with self.assertRaisesRegex(
704 ssl.CertificateError,
705 "wildcard can only be present in the leftmost label"):
706 ssl.match_hostname(cert, 'www.sub.example.com')
707
708 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
709 with self.assertRaisesRegex(
710 ssl.CertificateError,
711 "too many wildcards"):
712 ssl.match_hostname(cert, 'axxbxxc.example.com')
713
714 cert = {'subject': ((('commonName', '*'),),)}
715 with self.assertRaisesRegex(
716 ssl.CertificateError,
717 "sole wildcard without additional labels are not support"):
718 ssl.match_hostname(cert, 'host')
719
720 cert = {'subject': ((('commonName', '*.com'),),)}
721 with self.assertRaisesRegex(
722 ssl.CertificateError,
723 r"hostname 'com' doesn't match '\*.com'"):
724 ssl.match_hostname(cert, 'com')
725
726 # extra checks for _inet_paton()
727 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
728 with self.assertRaises(ValueError):
729 ssl._inet_paton(invalid)
730 for ipaddr in ['127.0.0.1', '192.168.0.1']:
731 self.assertTrue(ssl._inet_paton(ipaddr))
732 if hasattr(socket, 'AF_INET6'):
733 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
734 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200735
Antoine Pitroud5323212010-10-22 18:19:07 +0000736 def test_server_side(self):
737 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200738 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000739 with socket.socket() as sock:
740 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
741 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000742
Antoine Pitroud6494802011-07-21 01:11:30 +0200743 def test_unknown_channel_binding(self):
744 # should raise ValueError for unknown type
745 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200746 s.bind(('127.0.0.1', 0))
747 s.listen()
748 c = socket.socket(socket.AF_INET)
749 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200750 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100751 with self.assertRaises(ValueError):
752 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200753 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200754
755 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
756 "'tls-unique' channel binding not available")
757 def test_tls_unique_channel_binding(self):
758 # unconnected should return None for known type
759 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200760 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100761 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200762 # the same for server-side
763 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200764 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100765 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200766
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600767 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200768 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600769 r = repr(ss)
770 with self.assertWarns(ResourceWarning) as cm:
771 ss = None
772 support.gc_collect()
773 self.assertIn(r, str(cm.warning.args[0]))
774
Christian Heimes6d7ad132013-06-09 18:02:55 +0200775 def test_get_default_verify_paths(self):
776 paths = ssl.get_default_verify_paths()
777 self.assertEqual(len(paths), 6)
778 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
779
780 with support.EnvironmentVarGuard() as env:
781 env["SSL_CERT_DIR"] = CAPATH
782 env["SSL_CERT_FILE"] = CERTFILE
783 paths = ssl.get_default_verify_paths()
784 self.assertEqual(paths.cafile, CERTFILE)
785 self.assertEqual(paths.capath, CAPATH)
786
Christian Heimes44109d72013-11-22 01:51:30 +0100787 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
788 def test_enum_certificates(self):
789 self.assertTrue(ssl.enum_certificates("CA"))
790 self.assertTrue(ssl.enum_certificates("ROOT"))
791
792 self.assertRaises(TypeError, ssl.enum_certificates)
793 self.assertRaises(WindowsError, ssl.enum_certificates, "")
794
Christian Heimesc2d65e12013-11-22 16:13:55 +0100795 trust_oids = set()
796 for storename in ("CA", "ROOT"):
797 store = ssl.enum_certificates(storename)
798 self.assertIsInstance(store, list)
799 for element in store:
800 self.assertIsInstance(element, tuple)
801 self.assertEqual(len(element), 3)
802 cert, enc, trust = element
803 self.assertIsInstance(cert, bytes)
804 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
805 self.assertIsInstance(trust, (set, bool))
806 if isinstance(trust, set):
807 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100808
809 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100810 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200811
Christian Heimes46bebee2013-06-09 19:03:31 +0200812 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100813 def test_enum_crls(self):
814 self.assertTrue(ssl.enum_crls("CA"))
815 self.assertRaises(TypeError, ssl.enum_crls)
816 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200817
Christian Heimes44109d72013-11-22 01:51:30 +0100818 crls = ssl.enum_crls("CA")
819 self.assertIsInstance(crls, list)
820 for element in crls:
821 self.assertIsInstance(element, tuple)
822 self.assertEqual(len(element), 2)
823 self.assertIsInstance(element[0], bytes)
824 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200825
Christian Heimes46bebee2013-06-09 19:03:31 +0200826
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100827 def test_asn1object(self):
828 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
829 '1.3.6.1.5.5.7.3.1')
830
831 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
832 self.assertEqual(val, expected)
833 self.assertEqual(val.nid, 129)
834 self.assertEqual(val.shortname, 'serverAuth')
835 self.assertEqual(val.longname, 'TLS Web Server Authentication')
836 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
837 self.assertIsInstance(val, ssl._ASN1Object)
838 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
839
840 val = ssl._ASN1Object.fromnid(129)
841 self.assertEqual(val, expected)
842 self.assertIsInstance(val, ssl._ASN1Object)
843 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100844 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
845 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100846 for i in range(1000):
847 try:
848 obj = ssl._ASN1Object.fromnid(i)
849 except ValueError:
850 pass
851 else:
852 self.assertIsInstance(obj.nid, int)
853 self.assertIsInstance(obj.shortname, str)
854 self.assertIsInstance(obj.longname, str)
855 self.assertIsInstance(obj.oid, (str, type(None)))
856
857 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
858 self.assertEqual(val, expected)
859 self.assertIsInstance(val, ssl._ASN1Object)
860 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
861 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
862 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100863 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
864 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100865
Christian Heimes72d28502013-11-23 13:56:58 +0100866 def test_purpose_enum(self):
867 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
868 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
869 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
872 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
873 '1.3.6.1.5.5.7.3.1')
874
875 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
876 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
877 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
880 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
881 '1.3.6.1.5.5.7.3.2')
882
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100883 def test_unsupported_dtls(self):
884 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
885 self.addCleanup(s.close)
886 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200887 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100888 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100890 with self.assertRaises(NotImplementedError) as cx:
891 ctx.wrap_socket(s)
892 self.assertEqual(str(cx.exception), "only stream sockets are supported")
893
Antoine Pitrouc695c952014-04-28 20:57:36 +0200894 def cert_time_ok(self, timestring, timestamp):
895 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
896
897 def cert_time_fail(self, timestring):
898 with self.assertRaises(ValueError):
899 ssl.cert_time_to_seconds(timestring)
900
901 @unittest.skipUnless(utc_offset(),
902 'local time needs to be different from UTC')
903 def test_cert_time_to_seconds_timezone(self):
904 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
905 # results if local timezone is not UTC
906 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
907 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
908
909 def test_cert_time_to_seconds(self):
910 timestring = "Jan 5 09:34:43 2018 GMT"
911 ts = 1515144883.0
912 self.cert_time_ok(timestring, ts)
913 # accept keyword parameter, assert its name
914 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
915 # accept both %e and %d (space or zero generated by strftime)
916 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
917 # case-insensitive
918 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
919 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
920 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
921 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
922 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
923 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
924 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
925 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
926
927 newyear_ts = 1230768000.0
928 # leap seconds
929 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
930 # same timestamp
931 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
932
933 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
934 # allow 60th second (even if it is not a leap second)
935 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
936 # allow 2nd leap second for compatibility with time.strptime()
937 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
938 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
939
Mike53f7a7c2017-12-14 14:04:53 +0300940 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200941 # 99991231235959Z (rfc 5280)
942 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
943
944 @support.run_with_locale('LC_ALL', '')
945 def test_cert_time_to_seconds_locale(self):
946 # `cert_time_to_seconds()` should be locale independent
947
948 def local_february_name():
949 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
950
951 if local_february_name().lower() == 'feb':
952 self.skipTest("locale-specific month name needs to be "
953 "different from C locale")
954
955 # locale-independent
956 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
957 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
958
Martin Panter3840b2a2016-03-27 01:53:46 +0000959 def test_connect_ex_error(self):
960 server = socket.socket(socket.AF_INET)
961 self.addCleanup(server.close)
962 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200963 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000964 cert_reqs=ssl.CERT_REQUIRED)
965 self.addCleanup(s.close)
966 rc = s.connect_ex((HOST, port))
967 # Issue #19919: Windows machines or VMs hosted on Windows
968 # machines sometimes return EWOULDBLOCK.
969 errors = (
970 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
971 errno.EWOULDBLOCK,
972 )
973 self.assertIn(rc, errors)
974
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100975
Antoine Pitrou152efa22010-05-16 18:19:27 +0000976class ContextTests(unittest.TestCase):
977
Antoine Pitrou23df4832010-08-04 17:14:06 +0000978 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000979 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100980 for protocol in PROTOCOLS:
981 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200982 ctx = ssl.SSLContext()
983 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000984 self.assertRaises(ValueError, ssl.SSLContext, -1)
985 self.assertRaises(ValueError, ssl.SSLContext, 42)
986
Antoine Pitrou23df4832010-08-04 17:14:06 +0000987 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000988 def test_protocol(self):
989 for proto in PROTOCOLS:
990 ctx = ssl.SSLContext(proto)
991 self.assertEqual(ctx.protocol, proto)
992
993 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000995 ctx.set_ciphers("ALL")
996 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000997 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000998 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000999
Christian Heimes892d66e2018-01-29 14:10:18 +01001000 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1001 "Test applies only to Python default ciphers")
1002 def test_python_ciphers(self):
1003 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1004 ciphers = ctx.get_ciphers()
1005 for suite in ciphers:
1006 name = suite['name']
1007 self.assertNotIn("PSK", name)
1008 self.assertNotIn("SRP", name)
1009 self.assertNotIn("MD5", name)
1010 self.assertNotIn("RC4", name)
1011 self.assertNotIn("3DES", name)
1012
Christian Heimes25bfcd52016-09-06 00:04:45 +02001013 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1014 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001016 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001017 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001018 self.assertIn('AES256-GCM-SHA384', names)
1019 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001020
Antoine Pitrou23df4832010-08-04 17:14:06 +00001021 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001022 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001024 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001025 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001026 # SSLContext also enables these by default
1027 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001028 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1029 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001030 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001031 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001032 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001033 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001034 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1035 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001036 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001037 # Ubuntu has OP_NO_SSLv3 forced on by default
1038 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001039 else:
1040 with self.assertRaises(ValueError):
1041 ctx.options = 0
1042
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 def test_verify_mode_protocol(self):
1044 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001045 # Default value
1046 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1047 ctx.verify_mode = ssl.CERT_OPTIONAL
1048 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1049 ctx.verify_mode = ssl.CERT_REQUIRED
1050 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1051 ctx.verify_mode = ssl.CERT_NONE
1052 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1053 with self.assertRaises(TypeError):
1054 ctx.verify_mode = None
1055 with self.assertRaises(ValueError):
1056 ctx.verify_mode = 42
1057
Christian Heimesa170fa12017-09-15 20:27:30 +02001058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1059 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1060 self.assertFalse(ctx.check_hostname)
1061
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1063 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1064 self.assertTrue(ctx.check_hostname)
1065
Christian Heimes61d478c2018-01-27 15:51:38 +01001066 def test_hostname_checks_common_name(self):
1067 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1068 self.assertTrue(ctx.hostname_checks_common_name)
1069 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1070 ctx.hostname_checks_common_name = True
1071 self.assertTrue(ctx.hostname_checks_common_name)
1072 ctx.hostname_checks_common_name = False
1073 self.assertFalse(ctx.hostname_checks_common_name)
1074 ctx.hostname_checks_common_name = True
1075 self.assertTrue(ctx.hostname_checks_common_name)
1076 else:
1077 with self.assertRaises(AttributeError):
1078 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001079
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001080 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1081 "required OpenSSL 1.1.0g")
1082 def test_min_max_version(self):
1083 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1084 self.assertEqual(
1085 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1086 )
1087 self.assertEqual(
1088 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1089 )
1090
1091 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1092 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1093 self.assertEqual(
1094 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1095 )
1096 self.assertEqual(
1097 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1098 )
1099
1100 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1101 ctx.maximum_version = ssl.TLSVersion.TLSv1
1102 self.assertEqual(
1103 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1104 )
1105 self.assertEqual(
1106 ctx.maximum_version, ssl.TLSVersion.TLSv1
1107 )
1108
1109 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1110 self.assertEqual(
1111 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1112 )
1113
1114 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1115 self.assertIn(
1116 ctx.maximum_version,
1117 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1118 )
1119
1120 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1121 self.assertIn(
1122 ctx.minimum_version,
1123 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1124 )
1125
1126 with self.assertRaises(ValueError):
1127 ctx.minimum_version = 42
1128
1129 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1130
1131 self.assertEqual(
1132 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1133 )
1134 self.assertEqual(
1135 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1136 )
1137 with self.assertRaises(ValueError):
1138 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1139 with self.assertRaises(ValueError):
1140 ctx.maximum_version = ssl.TLSVersion.TLSv1
1141
1142
Christian Heimes2427b502013-11-23 11:24:32 +01001143 @unittest.skipUnless(have_verify_flags(),
1144 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001145 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001146 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001147 # default value
1148 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1149 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001150 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1151 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1152 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1153 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1154 ctx.verify_flags = ssl.VERIFY_DEFAULT
1155 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1156 # supports any value
1157 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1158 self.assertEqual(ctx.verify_flags,
1159 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1160 with self.assertRaises(TypeError):
1161 ctx.verify_flags = None
1162
Antoine Pitrou152efa22010-05-16 18:19:27 +00001163 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001165 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001166 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001167 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1168 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001170 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001171 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001172 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001173 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001174 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001175 ctx.load_cert_chain(EMPTYCERT)
1176 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001177 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001178 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1179 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1180 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001181 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001182 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001183 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001184 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001185 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001186 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1187 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001188 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001189 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001190 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001191 # Password protected key and cert
1192 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1193 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1194 ctx.load_cert_chain(CERTFILE_PROTECTED,
1195 password=bytearray(KEY_PASSWORD.encode()))
1196 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1197 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1198 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1199 bytearray(KEY_PASSWORD.encode()))
1200 with self.assertRaisesRegex(TypeError, "should be a string"):
1201 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1202 with self.assertRaises(ssl.SSLError):
1203 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1204 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1205 # openssl has a fixed limit on the password buffer.
1206 # PEM_BUFSIZE is generally set to 1kb.
1207 # Return a string larger than this.
1208 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1209 # Password callback
1210 def getpass_unicode():
1211 return KEY_PASSWORD
1212 def getpass_bytes():
1213 return KEY_PASSWORD.encode()
1214 def getpass_bytearray():
1215 return bytearray(KEY_PASSWORD.encode())
1216 def getpass_badpass():
1217 return "badpass"
1218 def getpass_huge():
1219 return b'a' * (1024 * 1024)
1220 def getpass_bad_type():
1221 return 9
1222 def getpass_exception():
1223 raise Exception('getpass error')
1224 class GetPassCallable:
1225 def __call__(self):
1226 return KEY_PASSWORD
1227 def getpass(self):
1228 return KEY_PASSWORD
1229 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1232 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1233 ctx.load_cert_chain(CERTFILE_PROTECTED,
1234 password=GetPassCallable().getpass)
1235 with self.assertRaises(ssl.SSLError):
1236 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1237 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1238 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1239 with self.assertRaisesRegex(TypeError, "must return a string"):
1240 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1241 with self.assertRaisesRegex(Exception, "getpass error"):
1242 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1243 # Make sure the password function isn't called if it isn't needed
1244 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001245
1246 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001247 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001248 ctx.load_verify_locations(CERTFILE)
1249 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1250 ctx.load_verify_locations(BYTES_CERTFILE)
1251 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1252 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001253 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001254 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001255 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001256 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001257 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001258 ctx.load_verify_locations(BADCERT)
1259 ctx.load_verify_locations(CERTFILE, CAPATH)
1260 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1261
Victor Stinner80f75e62011-01-29 11:31:20 +00001262 # Issue #10989: crash if the second argument type is invalid
1263 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1264
Christian Heimesefff7062013-11-21 03:35:02 +01001265 def test_load_verify_cadata(self):
1266 # test cadata
1267 with open(CAFILE_CACERT) as f:
1268 cacert_pem = f.read()
1269 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1270 with open(CAFILE_NEURONIO) as f:
1271 neuronio_pem = f.read()
1272 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1273
1274 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001275 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001276 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1277 ctx.load_verify_locations(cadata=cacert_pem)
1278 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1279 ctx.load_verify_locations(cadata=neuronio_pem)
1280 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1281 # cert already in hash table
1282 ctx.load_verify_locations(cadata=neuronio_pem)
1283 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1284
1285 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001286 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001287 combined = "\n".join((cacert_pem, neuronio_pem))
1288 ctx.load_verify_locations(cadata=combined)
1289 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1290
1291 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001292 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001293 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1294 neuronio_pem, "tail"]
1295 ctx.load_verify_locations(cadata="\n".join(combined))
1296 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1297
1298 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001299 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001300 ctx.load_verify_locations(cadata=cacert_der)
1301 ctx.load_verify_locations(cadata=neuronio_der)
1302 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1303 # cert already in hash table
1304 ctx.load_verify_locations(cadata=cacert_der)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1306
1307 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001308 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001309 combined = b"".join((cacert_der, neuronio_der))
1310 ctx.load_verify_locations(cadata=combined)
1311 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1312
1313 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001314 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001315 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1316
1317 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1318 ctx.load_verify_locations(cadata="broken")
1319 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1320 ctx.load_verify_locations(cadata=b"broken")
1321
1322
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001323 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001324 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001325 ctx.load_dh_params(DHFILE)
1326 if os.name != 'nt':
1327 ctx.load_dh_params(BYTES_DHFILE)
1328 self.assertRaises(TypeError, ctx.load_dh_params)
1329 self.assertRaises(TypeError, ctx.load_dh_params, None)
1330 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001331 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001332 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001333 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001334 ctx.load_dh_params(CERTFILE)
1335
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001336 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001337 def test_session_stats(self):
1338 for proto in PROTOCOLS:
1339 ctx = ssl.SSLContext(proto)
1340 self.assertEqual(ctx.session_stats(), {
1341 'number': 0,
1342 'connect': 0,
1343 'connect_good': 0,
1344 'connect_renegotiate': 0,
1345 'accept': 0,
1346 'accept_good': 0,
1347 'accept_renegotiate': 0,
1348 'hits': 0,
1349 'misses': 0,
1350 'timeouts': 0,
1351 'cache_full': 0,
1352 })
1353
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001354 def test_set_default_verify_paths(self):
1355 # There's not much we can do to test that it acts as expected,
1356 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001357 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001358 ctx.set_default_verify_paths()
1359
Antoine Pitrou501da612011-12-21 09:27:41 +01001360 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001361 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001362 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001363 ctx.set_ecdh_curve("prime256v1")
1364 ctx.set_ecdh_curve(b"prime256v1")
1365 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1366 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1367 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1368 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1369
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001370 @needs_sni
1371 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001372 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001373
1374 # set_servername_callback expects a callable, or None
1375 self.assertRaises(TypeError, ctx.set_servername_callback)
1376 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1377 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1378 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1379
1380 def dummycallback(sock, servername, ctx):
1381 pass
1382 ctx.set_servername_callback(None)
1383 ctx.set_servername_callback(dummycallback)
1384
1385 @needs_sni
1386 def test_sni_callback_refcycle(self):
1387 # Reference cycles through the servername callback are detected
1388 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001389 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001390 def dummycallback(sock, servername, ctx, cycle=ctx):
1391 pass
1392 ctx.set_servername_callback(dummycallback)
1393 wr = weakref.ref(ctx)
1394 del ctx, dummycallback
1395 gc.collect()
1396 self.assertIs(wr(), None)
1397
Christian Heimes9a5395a2013-06-17 15:44:12 +02001398 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001399 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001400 self.assertEqual(ctx.cert_store_stats(),
1401 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1402 ctx.load_cert_chain(CERTFILE)
1403 self.assertEqual(ctx.cert_store_stats(),
1404 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1405 ctx.load_verify_locations(CERTFILE)
1406 self.assertEqual(ctx.cert_store_stats(),
1407 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001408 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001409 self.assertEqual(ctx.cert_store_stats(),
1410 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1411
1412 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001413 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001414 self.assertEqual(ctx.get_ca_certs(), [])
1415 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1416 ctx.load_verify_locations(CERTFILE)
1417 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001418 # but CAFILE_CACERT is a CA cert
1419 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001420 self.assertEqual(ctx.get_ca_certs(),
1421 [{'issuer': ((('organizationName', 'Root CA'),),
1422 (('organizationalUnitName', 'http://www.cacert.org'),),
1423 (('commonName', 'CA Cert Signing Authority'),),
1424 (('emailAddress', 'support@cacert.org'),)),
1425 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1426 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1427 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001428 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001429 'subject': ((('organizationName', 'Root CA'),),
1430 (('organizationalUnitName', 'http://www.cacert.org'),),
1431 (('commonName', 'CA Cert Signing Authority'),),
1432 (('emailAddress', 'support@cacert.org'),)),
1433 'version': 3}])
1434
Martin Panterb55f8b72016-01-14 12:53:56 +00001435 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001436 pem = f.read()
1437 der = ssl.PEM_cert_to_DER_cert(pem)
1438 self.assertEqual(ctx.get_ca_certs(True), [der])
1439
Christian Heimes72d28502013-11-23 13:56:58 +01001440 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001441 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001442 ctx.load_default_certs()
1443
Christian Heimesa170fa12017-09-15 20:27:30 +02001444 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001445 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1446 ctx.load_default_certs()
1447
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001449 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1450
Christian Heimesa170fa12017-09-15 20:27:30 +02001451 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001452 self.assertRaises(TypeError, ctx.load_default_certs, None)
1453 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1454
Benjamin Peterson91244e02014-10-03 18:17:15 -04001455 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001456 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001457 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001458 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001459 with support.EnvironmentVarGuard() as env:
1460 env["SSL_CERT_DIR"] = CAPATH
1461 env["SSL_CERT_FILE"] = CERTFILE
1462 ctx.load_default_certs()
1463 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1464
Benjamin Peterson91244e02014-10-03 18:17:15 -04001465 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001466 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001467 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001469 ctx.load_default_certs()
1470 stats = ctx.cert_store_stats()
1471
Christian Heimesa170fa12017-09-15 20:27:30 +02001472 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001473 with support.EnvironmentVarGuard() as env:
1474 env["SSL_CERT_DIR"] = CAPATH
1475 env["SSL_CERT_FILE"] = CERTFILE
1476 ctx.load_default_certs()
1477 stats["x509"] += 1
1478 self.assertEqual(ctx.cert_store_stats(), stats)
1479
Christian Heimes358cfd42016-09-10 22:43:48 +02001480 def _assert_context_options(self, ctx):
1481 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1482 if OP_NO_COMPRESSION != 0:
1483 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1484 OP_NO_COMPRESSION)
1485 if OP_SINGLE_DH_USE != 0:
1486 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1487 OP_SINGLE_DH_USE)
1488 if OP_SINGLE_ECDH_USE != 0:
1489 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1490 OP_SINGLE_ECDH_USE)
1491 if OP_CIPHER_SERVER_PREFERENCE != 0:
1492 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1493 OP_CIPHER_SERVER_PREFERENCE)
1494
Christian Heimes4c05b472013-11-23 15:58:30 +01001495 def test_create_default_context(self):
1496 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001497
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001499 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001500 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001501 self._assert_context_options(ctx)
1502
Christian Heimes4c05b472013-11-23 15:58:30 +01001503 with open(SIGNING_CA) as f:
1504 cadata = f.read()
1505 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1506 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001507 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001508 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001509 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001510
1511 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001512 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001513 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001515
Christian Heimes67986f92013-11-23 22:43:47 +01001516 def test__create_stdlib_context(self):
1517 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001518 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001519 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001520 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001521 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001522
1523 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1524 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1525 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001526 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001527
1528 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001529 cert_reqs=ssl.CERT_REQUIRED,
1530 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001531 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1532 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001533 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001534 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001535
1536 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001537 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001538 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001539 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001540
Christian Heimes1aa9a752013-12-02 02:41:19 +01001541 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001543 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001544 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001545
Christian Heimese82c0342017-09-15 20:29:57 +02001546 # Auto set CERT_REQUIRED
1547 ctx.check_hostname = True
1548 self.assertTrue(ctx.check_hostname)
1549 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1550 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001551 ctx.verify_mode = ssl.CERT_REQUIRED
1552 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001553 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001554
Christian Heimese82c0342017-09-15 20:29:57 +02001555 # Changing verify_mode does not affect check_hostname
1556 ctx.check_hostname = False
1557 ctx.verify_mode = ssl.CERT_NONE
1558 ctx.check_hostname = False
1559 self.assertFalse(ctx.check_hostname)
1560 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1561 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001562 ctx.check_hostname = True
1563 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001564 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1565
1566 ctx.check_hostname = False
1567 ctx.verify_mode = ssl.CERT_OPTIONAL
1568 ctx.check_hostname = False
1569 self.assertFalse(ctx.check_hostname)
1570 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1571 # keep CERT_OPTIONAL
1572 ctx.check_hostname = True
1573 self.assertTrue(ctx.check_hostname)
1574 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575
1576 # Cannot set CERT_NONE with check_hostname enabled
1577 with self.assertRaises(ValueError):
1578 ctx.verify_mode = ssl.CERT_NONE
1579 ctx.check_hostname = False
1580 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001581 ctx.verify_mode = ssl.CERT_NONE
1582 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001583
Christian Heimes5fe668c2016-09-12 00:01:11 +02001584 def test_context_client_server(self):
1585 # PROTOCOL_TLS_CLIENT has sane defaults
1586 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1587 self.assertTrue(ctx.check_hostname)
1588 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1589
1590 # PROTOCOL_TLS_SERVER has different but also sane defaults
1591 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1592 self.assertFalse(ctx.check_hostname)
1593 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1594
Christian Heimes4df60f12017-09-15 20:26:05 +02001595 def test_context_custom_class(self):
1596 class MySSLSocket(ssl.SSLSocket):
1597 pass
1598
1599 class MySSLObject(ssl.SSLObject):
1600 pass
1601
1602 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1603 ctx.sslsocket_class = MySSLSocket
1604 ctx.sslobject_class = MySSLObject
1605
1606 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1607 self.assertIsInstance(sock, MySSLSocket)
1608 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1609 self.assertIsInstance(obj, MySSLObject)
1610
Antoine Pitrou152efa22010-05-16 18:19:27 +00001611
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001612class SSLErrorTests(unittest.TestCase):
1613
1614 def test_str(self):
1615 # The str() of a SSLError doesn't include the errno
1616 e = ssl.SSLError(1, "foo")
1617 self.assertEqual(str(e), "foo")
1618 self.assertEqual(e.errno, 1)
1619 # Same for a subclass
1620 e = ssl.SSLZeroReturnError(1, "foo")
1621 self.assertEqual(str(e), "foo")
1622 self.assertEqual(e.errno, 1)
1623
1624 def test_lib_reason(self):
1625 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001626 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001627 with self.assertRaises(ssl.SSLError) as cm:
1628 ctx.load_dh_params(CERTFILE)
1629 self.assertEqual(cm.exception.library, 'PEM')
1630 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1631 s = str(cm.exception)
1632 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1633
1634 def test_subclass(self):
1635 # Check that the appropriate SSLError subclass is raised
1636 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001637 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1638 ctx.check_hostname = False
1639 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001640 with socket.socket() as s:
1641 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001642 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001643 c = socket.socket()
1644 c.connect(s.getsockname())
1645 c.setblocking(False)
1646 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001647 with self.assertRaises(ssl.SSLWantReadError) as cm:
1648 c.do_handshake()
1649 s = str(cm.exception)
1650 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1651 # For compatibility
1652 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1653
1654
Christian Heimes61d478c2018-01-27 15:51:38 +01001655 def test_bad_server_hostname(self):
1656 ctx = ssl.create_default_context()
1657 with self.assertRaises(ValueError):
1658 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1659 server_hostname="")
1660 with self.assertRaises(ValueError):
1661 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1662 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001663 with self.assertRaises(TypeError):
1664 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1665 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001666
1667
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001668class MemoryBIOTests(unittest.TestCase):
1669
1670 def test_read_write(self):
1671 bio = ssl.MemoryBIO()
1672 bio.write(b'foo')
1673 self.assertEqual(bio.read(), b'foo')
1674 self.assertEqual(bio.read(), b'')
1675 bio.write(b'foo')
1676 bio.write(b'bar')
1677 self.assertEqual(bio.read(), b'foobar')
1678 self.assertEqual(bio.read(), b'')
1679 bio.write(b'baz')
1680 self.assertEqual(bio.read(2), b'ba')
1681 self.assertEqual(bio.read(1), b'z')
1682 self.assertEqual(bio.read(1), b'')
1683
1684 def test_eof(self):
1685 bio = ssl.MemoryBIO()
1686 self.assertFalse(bio.eof)
1687 self.assertEqual(bio.read(), b'')
1688 self.assertFalse(bio.eof)
1689 bio.write(b'foo')
1690 self.assertFalse(bio.eof)
1691 bio.write_eof()
1692 self.assertFalse(bio.eof)
1693 self.assertEqual(bio.read(2), b'fo')
1694 self.assertFalse(bio.eof)
1695 self.assertEqual(bio.read(1), b'o')
1696 self.assertTrue(bio.eof)
1697 self.assertEqual(bio.read(), b'')
1698 self.assertTrue(bio.eof)
1699
1700 def test_pending(self):
1701 bio = ssl.MemoryBIO()
1702 self.assertEqual(bio.pending, 0)
1703 bio.write(b'foo')
1704 self.assertEqual(bio.pending, 3)
1705 for i in range(3):
1706 bio.read(1)
1707 self.assertEqual(bio.pending, 3-i-1)
1708 for i in range(3):
1709 bio.write(b'x')
1710 self.assertEqual(bio.pending, i+1)
1711 bio.read()
1712 self.assertEqual(bio.pending, 0)
1713
1714 def test_buffer_types(self):
1715 bio = ssl.MemoryBIO()
1716 bio.write(b'foo')
1717 self.assertEqual(bio.read(), b'foo')
1718 bio.write(bytearray(b'bar'))
1719 self.assertEqual(bio.read(), b'bar')
1720 bio.write(memoryview(b'baz'))
1721 self.assertEqual(bio.read(), b'baz')
1722
1723 def test_error_types(self):
1724 bio = ssl.MemoryBIO()
1725 self.assertRaises(TypeError, bio.write, 'foo')
1726 self.assertRaises(TypeError, bio.write, None)
1727 self.assertRaises(TypeError, bio.write, True)
1728 self.assertRaises(TypeError, bio.write, 1)
1729
1730
Christian Heimes89c20512018-02-27 11:17:32 +01001731class SSLObjectTests(unittest.TestCase):
1732 def test_private_init(self):
1733 bio = ssl.MemoryBIO()
1734 with self.assertRaisesRegex(TypeError, "public constructor"):
1735 ssl.SSLObject(bio, bio)
1736
1737
Martin Panter3840b2a2016-03-27 01:53:46 +00001738class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001739 """Tests that connect to a simple server running in the background"""
1740
1741 def setUp(self):
1742 server = ThreadedEchoServer(SIGNED_CERTFILE)
1743 self.server_addr = (HOST, server.port)
1744 server.__enter__()
1745 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001746
Antoine Pitrou480a1242010-04-28 21:37:09 +00001747 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001748 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001749 cert_reqs=ssl.CERT_NONE) as s:
1750 s.connect(self.server_addr)
1751 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001752 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001753
Martin Panter3840b2a2016-03-27 01:53:46 +00001754 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001755 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001756 cert_reqs=ssl.CERT_REQUIRED,
1757 ca_certs=SIGNING_CA) as s:
1758 s.connect(self.server_addr)
1759 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001760 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001761
Martin Panter3840b2a2016-03-27 01:53:46 +00001762 def test_connect_fail(self):
1763 # This should fail because we have no verification certs. Connection
1764 # failure crashes ThreadedEchoServer, so run this in an independent
1765 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001766 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001767 cert_reqs=ssl.CERT_REQUIRED)
1768 self.addCleanup(s.close)
1769 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1770 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001771
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001772 def test_connect_ex(self):
1773 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001774 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001775 cert_reqs=ssl.CERT_REQUIRED,
1776 ca_certs=SIGNING_CA)
1777 self.addCleanup(s.close)
1778 self.assertEqual(0, s.connect_ex(self.server_addr))
1779 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001780
1781 def test_non_blocking_connect_ex(self):
1782 # Issue #11326: non-blocking connect_ex() should allow handshake
1783 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001784 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001785 cert_reqs=ssl.CERT_REQUIRED,
1786 ca_certs=SIGNING_CA,
1787 do_handshake_on_connect=False)
1788 self.addCleanup(s.close)
1789 s.setblocking(False)
1790 rc = s.connect_ex(self.server_addr)
1791 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1792 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1793 # Wait for connect to finish
1794 select.select([], [s], [], 5.0)
1795 # Non-blocking handshake
1796 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001797 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001798 s.do_handshake()
1799 break
1800 except ssl.SSLWantReadError:
1801 select.select([s], [], [], 5.0)
1802 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001803 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001804 # SSL established
1805 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001806
Antoine Pitrou152efa22010-05-16 18:19:27 +00001807 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001808 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001809 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001810 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1811 s.connect(self.server_addr)
1812 self.assertEqual({}, s.getpeercert())
1813 # Same with a server hostname
1814 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1815 server_hostname="dummy") as s:
1816 s.connect(self.server_addr)
1817 ctx.verify_mode = ssl.CERT_REQUIRED
1818 # This should succeed because we specify the root cert
1819 ctx.load_verify_locations(SIGNING_CA)
1820 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1821 s.connect(self.server_addr)
1822 cert = s.getpeercert()
1823 self.assertTrue(cert)
1824
1825 def test_connect_with_context_fail(self):
1826 # This should fail because we have no verification certs. Connection
1827 # failure crashes ThreadedEchoServer, so run this in an independent
1828 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001829 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001830 ctx.verify_mode = ssl.CERT_REQUIRED
1831 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1832 self.addCleanup(s.close)
1833 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1834 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001835
1836 def test_connect_capath(self):
1837 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001838 # NOTE: the subject hashing algorithm has been changed between
1839 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1840 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001841 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001842 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 ctx.verify_mode = ssl.CERT_REQUIRED
1844 ctx.load_verify_locations(capath=CAPATH)
1845 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1846 s.connect(self.server_addr)
1847 cert = s.getpeercert()
1848 self.assertTrue(cert)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07001849
Martin Panter3840b2a2016-03-27 01:53:46 +00001850 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001851 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001852 ctx.verify_mode = ssl.CERT_REQUIRED
1853 ctx.load_verify_locations(capath=BYTES_CAPATH)
1854 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1855 s.connect(self.server_addr)
1856 cert = s.getpeercert()
1857 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001858
Christian Heimesefff7062013-11-21 03:35:02 +01001859 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001860 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001861 pem = f.read()
1862 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001863 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001864 ctx.verify_mode = ssl.CERT_REQUIRED
1865 ctx.load_verify_locations(cadata=pem)
1866 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1867 s.connect(self.server_addr)
1868 cert = s.getpeercert()
1869 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001870
Martin Panter3840b2a2016-03-27 01:53:46 +00001871 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001872 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001873 ctx.verify_mode = ssl.CERT_REQUIRED
1874 ctx.load_verify_locations(cadata=der)
1875 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1876 s.connect(self.server_addr)
1877 cert = s.getpeercert()
1878 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001879
Antoine Pitroue3220242010-04-24 11:13:53 +00001880 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1881 def test_makefile_close(self):
1882 # Issue #5238: creating a file-like object with makefile() shouldn't
1883 # delay closing the underlying "real socket" (here tested with its
1884 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001885 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001886 ss.connect(self.server_addr)
1887 fd = ss.fileno()
1888 f = ss.makefile()
1889 f.close()
1890 # The fd is still open
1891 os.read(fd, 0)
1892 # Closing the SSL socket should close the fd too
1893 ss.close()
1894 gc.collect()
1895 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001896 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001897 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001898
Antoine Pitrou480a1242010-04-28 21:37:09 +00001899 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001900 s = socket.socket(socket.AF_INET)
1901 s.connect(self.server_addr)
1902 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001903 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001904 cert_reqs=ssl.CERT_NONE,
1905 do_handshake_on_connect=False)
1906 self.addCleanup(s.close)
1907 count = 0
1908 while True:
1909 try:
1910 count += 1
1911 s.do_handshake()
1912 break
1913 except ssl.SSLWantReadError:
1914 select.select([s], [], [])
1915 except ssl.SSLWantWriteError:
1916 select.select([], [s], [])
1917 if support.verbose:
1918 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001919
Antoine Pitrou480a1242010-04-28 21:37:09 +00001920 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001921 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001922
Martin Panter3840b2a2016-03-27 01:53:46 +00001923 def test_get_server_certificate_fail(self):
1924 # Connection failure crashes ThreadedEchoServer, so run this in an
1925 # independent test method
1926 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001927
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001928 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001929 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001930 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1931 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001932 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001933 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1934 s.connect(self.server_addr)
1935 # Error checking can happen at instantiation or when connecting
1936 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1937 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001938 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1940 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001941
Christian Heimes9a5395a2013-06-17 15:44:12 +02001942 def test_get_ca_certs_capath(self):
1943 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001944 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001945 ctx.load_verify_locations(capath=CAPATH)
1946 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001947 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1948 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001949 s.connect(self.server_addr)
1950 cert = s.getpeercert()
1951 self.assertTrue(cert)
1952 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001953
Christian Heimes575596e2013-12-15 21:49:17 +01001954 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001955 def test_context_setget(self):
1956 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001957 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1958 ctx1.load_verify_locations(capath=CAPATH)
1959 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1960 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001961 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001962 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001963 ss.connect(self.server_addr)
1964 self.assertIs(ss.context, ctx1)
1965 self.assertIs(ss._sslobj.context, ctx1)
1966 ss.context = ctx2
1967 self.assertIs(ss.context, ctx2)
1968 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001969
1970 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1971 # A simple IO loop. Call func(*args) depending on the error we get
1972 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1973 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001974 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001975 count = 0
1976 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001977 if time.monotonic() > deadline:
1978 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001979 errno = None
1980 count += 1
1981 try:
1982 ret = func(*args)
1983 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001984 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001985 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001986 raise
1987 errno = e.errno
1988 # Get any data from the outgoing BIO irrespective of any error, and
1989 # send it to the socket.
1990 buf = outgoing.read()
1991 sock.sendall(buf)
1992 # If there's no error, we're done. For WANT_READ, we need to get
1993 # data from the socket and put it in the incoming BIO.
1994 if errno is None:
1995 break
1996 elif errno == ssl.SSL_ERROR_WANT_READ:
1997 buf = sock.recv(32768)
1998 if buf:
1999 incoming.write(buf)
2000 else:
2001 incoming.write_eof()
2002 if support.verbose:
2003 sys.stdout.write("Needed %d calls to complete %s().\n"
2004 % (count, func.__name__))
2005 return ret
2006
Martin Panter3840b2a2016-03-27 01:53:46 +00002007 def test_bio_handshake(self):
2008 sock = socket.socket(socket.AF_INET)
2009 self.addCleanup(sock.close)
2010 sock.connect(self.server_addr)
2011 incoming = ssl.MemoryBIO()
2012 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002013 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2014 self.assertTrue(ctx.check_hostname)
2015 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002016 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002017 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2018 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002019 self.assertIs(sslobj._sslobj.owner, sslobj)
2020 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002021 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002022 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002023 self.assertRaises(ValueError, sslobj.getpeercert)
2024 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2025 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2026 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2027 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002028 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002029 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002030 self.assertTrue(sslobj.getpeercert())
2031 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2032 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2033 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002034 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002035 except ssl.SSLSyscallError:
2036 # If the server shuts down the TCP connection without sending a
2037 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2038 pass
2039 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2040
2041 def test_bio_read_write_data(self):
2042 sock = socket.socket(socket.AF_INET)
2043 self.addCleanup(sock.close)
2044 sock.connect(self.server_addr)
2045 incoming = ssl.MemoryBIO()
2046 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002047 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002048 ctx.verify_mode = ssl.CERT_NONE
2049 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2050 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2051 req = b'FOO\n'
2052 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2053 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2054 self.assertEqual(buf, b'foo\n')
2055 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002056
2057
Martin Panter3840b2a2016-03-27 01:53:46 +00002058class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002059
Martin Panter3840b2a2016-03-27 01:53:46 +00002060 def test_timeout_connect_ex(self):
2061 # Issue #12065: on a timeout, connect_ex() should return the original
2062 # errno (mimicking the behaviour of non-SSL sockets).
2063 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002064 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002065 cert_reqs=ssl.CERT_REQUIRED,
2066 do_handshake_on_connect=False)
2067 self.addCleanup(s.close)
2068 s.settimeout(0.0000001)
2069 rc = s.connect_ex((REMOTE_HOST, 443))
2070 if rc == 0:
2071 self.skipTest("REMOTE_HOST responded too quickly")
2072 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2073
2074 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2075 def test_get_server_certificate_ipv6(self):
2076 with support.transient_internet('ipv6.google.com'):
2077 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2078 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2079
Martin Panter3840b2a2016-03-27 01:53:46 +00002080
2081def _test_get_server_certificate(test, host, port, cert=None):
2082 pem = ssl.get_server_certificate((host, port))
2083 if not pem:
2084 test.fail("No server certificate on %s:%s!" % (host, port))
2085
2086 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2087 if not pem:
2088 test.fail("No server certificate on %s:%s!" % (host, port))
2089 if support.verbose:
2090 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2091
2092def _test_get_server_certificate_fail(test, host, port):
2093 try:
2094 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2095 except ssl.SSLError as x:
2096 #should fail
2097 if support.verbose:
2098 sys.stdout.write("%s\n" % x)
2099 else:
2100 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2101
2102
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002103from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002104
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002105class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002106
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002107 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002108
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002109 """A mildly complicated class, because we want it to work both
2110 with and without the SSL wrapper around the socket connection, so
2111 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002112
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002113 def __init__(self, server, connsock, addr):
2114 self.server = server
2115 self.running = False
2116 self.sock = connsock
2117 self.addr = addr
2118 self.sock.setblocking(1)
2119 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002120 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002121 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002122
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002123 def wrap_conn(self):
2124 try:
2125 self.sslconn = self.server.context.wrap_socket(
2126 self.sock, server_side=True)
2127 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2128 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002129 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002130 # We treat ConnectionResetError as though it were an
2131 # SSLError - OpenSSL on Ubuntu abruptly closes the
2132 # connection when asked to use an unsupported protocol.
2133 #
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002134 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2135 # tries to send session tickets after handshake.
2136 # https://github.com/openssl/openssl/issues/6342
2137 self.server.conn_errors.append(str(e))
2138 if self.server.chatty:
2139 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2140 self.running = False
2141 self.close()
2142 return False
2143 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002144 # OSError may occur with wrong protocols, e.g. both
2145 # sides use PROTOCOL_TLS_SERVER.
2146 #
2147 # XXX Various errors can have happened here, for example
2148 # a mismatching protocol version, an invalid certificate,
2149 # or a low-level bug. This should be made more discriminating.
2150 #
2151 # bpo-31323: Store the exception as string to prevent
2152 # a reference leak: server -> conn_errors -> exception
2153 # -> traceback -> self (ConnectionHandler) -> server
2154 self.server.conn_errors.append(str(e))
2155 if self.server.chatty:
2156 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2157 self.running = False
2158 self.server.stop()
2159 self.close()
2160 return False
2161 else:
2162 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2163 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2164 cert = self.sslconn.getpeercert()
2165 if support.verbose and self.server.chatty:
2166 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2167 cert_binary = self.sslconn.getpeercert(True)
2168 if support.verbose and self.server.chatty:
2169 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2170 cipher = self.sslconn.cipher()
2171 if support.verbose and self.server.chatty:
2172 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2173 sys.stdout.write(" server: selected protocol is now "
2174 + str(self.sslconn.selected_npn_protocol()) + "\n")
2175 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002176
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002177 def read(self):
2178 if self.sslconn:
2179 return self.sslconn.read()
2180 else:
2181 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002182
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002183 def write(self, bytes):
2184 if self.sslconn:
2185 return self.sslconn.write(bytes)
2186 else:
2187 return self.sock.send(bytes)
2188
2189 def close(self):
2190 if self.sslconn:
2191 self.sslconn.close()
2192 else:
2193 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002194
Antoine Pitrou480a1242010-04-28 21:37:09 +00002195 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002196 self.running = True
2197 if not self.server.starttls_server:
2198 if not self.wrap_conn():
2199 return
2200 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002201 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002202 msg = self.read()
2203 stripped = msg.strip()
2204 if not stripped:
2205 # eof, so quit this handler
2206 self.running = False
2207 try:
2208 self.sock = self.sslconn.unwrap()
2209 except OSError:
2210 # Many tests shut the TCP connection down
2211 # without an SSL shutdown. This causes
2212 # unwrap() to raise OSError with errno=0!
2213 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002214 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002215 self.sslconn = None
2216 self.close()
2217 elif stripped == b'over':
2218 if support.verbose and self.server.connectionchatty:
2219 sys.stdout.write(" server: client closed connection\n")
2220 self.close()
2221 return
2222 elif (self.server.starttls_server and
2223 stripped == b'STARTTLS'):
2224 if support.verbose and self.server.connectionchatty:
2225 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2226 self.write(b"OK\n")
2227 if not self.wrap_conn():
2228 return
2229 elif (self.server.starttls_server and self.sslconn
2230 and stripped == b'ENDTLS'):
2231 if support.verbose and self.server.connectionchatty:
2232 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2233 self.write(b"OK\n")
2234 self.sock = self.sslconn.unwrap()
2235 self.sslconn = None
2236 if support.verbose and self.server.connectionchatty:
2237 sys.stdout.write(" server: connection is now unencrypted...\n")
2238 elif stripped == b'CB tls-unique':
2239 if support.verbose and self.server.connectionchatty:
2240 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2241 data = self.sslconn.get_channel_binding("tls-unique")
2242 self.write(repr(data).encode("us-ascii") + b"\n")
2243 else:
2244 if (support.verbose and
2245 self.server.connectionchatty):
2246 ctype = (self.sslconn and "encrypted") or "unencrypted"
2247 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2248 % (msg, ctype, msg.lower(), ctype))
2249 self.write(msg.lower())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002250 except ConnectionResetError:
2251 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2252 # when connection is not shut down gracefully.
2253 if self.server.chatty and support.verbose:
2254 sys.stdout.write(
2255 " Connection reset by peer: {}\n".format(
2256 self.addr)
2257 )
2258 self.close()
2259 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002260 except OSError:
2261 if self.server.chatty:
2262 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002263 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002264 self.running = False
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002265
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002266 # normally, we'd just stop here, but for the test
2267 # harness, we want to stop the server
2268 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002269
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002270 def __init__(self, certificate=None, ssl_version=None,
2271 certreqs=None, cacerts=None,
2272 chatty=True, connectionchatty=False, starttls_server=False,
2273 npn_protocols=None, alpn_protocols=None,
2274 ciphers=None, context=None):
2275 if context:
2276 self.context = context
2277 else:
2278 self.context = ssl.SSLContext(ssl_version
2279 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002280 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002281 self.context.verify_mode = (certreqs if certreqs is not None
2282 else ssl.CERT_NONE)
2283 if cacerts:
2284 self.context.load_verify_locations(cacerts)
2285 if certificate:
2286 self.context.load_cert_chain(certificate)
2287 if npn_protocols:
2288 self.context.set_npn_protocols(npn_protocols)
2289 if alpn_protocols:
2290 self.context.set_alpn_protocols(alpn_protocols)
2291 if ciphers:
2292 self.context.set_ciphers(ciphers)
2293 self.chatty = chatty
2294 self.connectionchatty = connectionchatty
2295 self.starttls_server = starttls_server
2296 self.sock = socket.socket()
2297 self.port = support.bind_port(self.sock)
2298 self.flag = None
2299 self.active = False
2300 self.selected_npn_protocols = []
2301 self.selected_alpn_protocols = []
2302 self.shared_ciphers = []
2303 self.conn_errors = []
2304 threading.Thread.__init__(self)
2305 self.daemon = True
2306
2307 def __enter__(self):
2308 self.start(threading.Event())
2309 self.flag.wait()
2310 return self
2311
2312 def __exit__(self, *args):
2313 self.stop()
2314 self.join()
2315
2316 def start(self, flag=None):
2317 self.flag = flag
2318 threading.Thread.start(self)
2319
2320 def run(self):
2321 self.sock.settimeout(0.05)
2322 self.sock.listen()
2323 self.active = True
2324 if self.flag:
2325 # signal an event
2326 self.flag.set()
2327 while self.active:
2328 try:
2329 newconn, connaddr = self.sock.accept()
2330 if support.verbose and self.chatty:
2331 sys.stdout.write(' server: new connection from '
2332 + repr(connaddr) + '\n')
2333 handler = self.ConnectionHandler(self, newconn, connaddr)
2334 handler.start()
2335 handler.join()
2336 except socket.timeout:
2337 pass
2338 except KeyboardInterrupt:
2339 self.stop()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002340 except BaseException as e:
2341 if support.verbose and self.chatty:
2342 sys.stdout.write(
2343 ' connection handling failed: ' + repr(e) + '\n')
2344
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002345 self.sock.close()
2346
2347 def stop(self):
2348 self.active = False
2349
2350class AsyncoreEchoServer(threading.Thread):
2351
2352 # this one's based on asyncore.dispatcher
2353
2354 class EchoServer (asyncore.dispatcher):
2355
2356 class ConnectionHandler(asyncore.dispatcher_with_send):
2357
2358 def __init__(self, conn, certfile):
2359 self.socket = test_wrap_socket(conn, server_side=True,
2360 certfile=certfile,
2361 do_handshake_on_connect=False)
2362 asyncore.dispatcher_with_send.__init__(self, self.socket)
2363 self._ssl_accepting = True
2364 self._do_ssl_handshake()
2365
2366 def readable(self):
2367 if isinstance(self.socket, ssl.SSLSocket):
2368 while self.socket.pending() > 0:
2369 self.handle_read_event()
2370 return True
2371
2372 def _do_ssl_handshake(self):
2373 try:
2374 self.socket.do_handshake()
2375 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2376 return
2377 except ssl.SSLEOFError:
2378 return self.handle_close()
2379 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002380 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002381 except OSError as err:
2382 if err.args[0] == errno.ECONNABORTED:
2383 return self.handle_close()
2384 else:
2385 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002386
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002387 def handle_read(self):
2388 if self._ssl_accepting:
2389 self._do_ssl_handshake()
2390 else:
2391 data = self.recv(1024)
2392 if support.verbose:
2393 sys.stdout.write(" server: read %s from client\n" % repr(data))
2394 if not data:
2395 self.close()
2396 else:
2397 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002398
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002399 def handle_close(self):
2400 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002401 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002402 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002403
2404 def handle_error(self):
2405 raise
2406
Trent Nelson78520002008-04-10 20:54:35 +00002407 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002408 self.certfile = certfile
2409 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2410 self.port = support.bind_port(sock, '')
2411 asyncore.dispatcher.__init__(self, sock)
2412 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002413
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002414 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002415 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002416 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2417 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002418
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002419 def handle_error(self):
2420 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002421
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002422 def __init__(self, certfile):
2423 self.flag = None
2424 self.active = False
2425 self.server = self.EchoServer(certfile)
2426 self.port = self.server.port
2427 threading.Thread.__init__(self)
2428 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002429
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002430 def __str__(self):
2431 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002432
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002433 def __enter__(self):
2434 self.start(threading.Event())
2435 self.flag.wait()
2436 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002437
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002438 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002439 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002440 sys.stdout.write(" cleanup: stopping server.\n")
2441 self.stop()
2442 if support.verbose:
2443 sys.stdout.write(" cleanup: joining server thread.\n")
2444 self.join()
2445 if support.verbose:
2446 sys.stdout.write(" cleanup: successfully joined.\n")
2447 # make sure that ConnectionHandler is removed from socket_map
2448 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002449
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002450 def start (self, flag=None):
2451 self.flag = flag
2452 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002453
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002454 def run(self):
2455 self.active = True
2456 if self.flag:
2457 self.flag.set()
2458 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002459 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002460 asyncore.loop(1)
2461 except:
2462 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002463
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002464 def stop(self):
2465 self.active = False
2466 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002467
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002468def server_params_test(client_context, server_context, indata=b"FOO\n",
2469 chatty=True, connectionchatty=False, sni_name=None,
2470 session=None):
2471 """
2472 Launch a server, connect a client to it and try various reads
2473 and writes.
2474 """
2475 stats = {}
2476 server = ThreadedEchoServer(context=server_context,
2477 chatty=chatty,
2478 connectionchatty=False)
2479 with server:
2480 with client_context.wrap_socket(socket.socket(),
2481 server_hostname=sni_name, session=session) as s:
2482 s.connect((HOST, server.port))
2483 for arg in [indata, bytearray(indata), memoryview(indata)]:
2484 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002485 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002486 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002487 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002488 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002489 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002490 if connectionchatty:
2491 if support.verbose:
2492 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002493 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002494 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002495 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2496 % (outdata[:20], len(outdata),
2497 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002498 s.write(b"over\n")
2499 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002500 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002501 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 stats.update({
2503 'compression': s.compression(),
2504 'cipher': s.cipher(),
2505 'peercert': s.getpeercert(),
2506 'client_alpn_protocol': s.selected_alpn_protocol(),
2507 'client_npn_protocol': s.selected_npn_protocol(),
2508 'version': s.version(),
2509 'session_reused': s.session_reused,
2510 'session': s.session,
2511 })
2512 s.close()
2513 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2514 stats['server_npn_protocols'] = server.selected_npn_protocols
2515 stats['server_shared_ciphers'] = server.shared_ciphers
2516 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002517
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002518def try_protocol_combo(server_protocol, client_protocol, expect_success,
2519 certsreqs=None, server_options=0, client_options=0):
2520 """
2521 Try to SSL-connect using *client_protocol* to *server_protocol*.
2522 If *expect_success* is true, assert that the connection succeeds,
2523 if it's false, assert that the connection fails.
2524 Also, if *expect_success* is a string, assert that it is the protocol
2525 version actually used by the connection.
2526 """
2527 if certsreqs is None:
2528 certsreqs = ssl.CERT_NONE
2529 certtype = {
2530 ssl.CERT_NONE: "CERT_NONE",
2531 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2532 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2533 }[certsreqs]
2534 if support.verbose:
2535 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2536 sys.stdout.write(formatstr %
2537 (ssl.get_protocol_name(client_protocol),
2538 ssl.get_protocol_name(server_protocol),
2539 certtype))
2540 client_context = ssl.SSLContext(client_protocol)
2541 client_context.options |= client_options
2542 server_context = ssl.SSLContext(server_protocol)
2543 server_context.options |= server_options
2544
2545 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2546 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2547 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002548 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002549 client_context.set_ciphers("ALL")
2550
2551 for ctx in (client_context, server_context):
2552 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002553 ctx.load_cert_chain(SIGNED_CERTFILE)
2554 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002555 try:
2556 stats = server_params_test(client_context, server_context,
2557 chatty=False, connectionchatty=False)
2558 # Protocol mismatch can result in either an SSLError, or a
2559 # "Connection reset by peer" error.
2560 except ssl.SSLError:
2561 if expect_success:
2562 raise
2563 except OSError as e:
2564 if expect_success or e.errno != errno.ECONNRESET:
2565 raise
2566 else:
2567 if not expect_success:
2568 raise AssertionError(
2569 "Client protocol %s succeeded with server protocol %s!"
2570 % (ssl.get_protocol_name(client_protocol),
2571 ssl.get_protocol_name(server_protocol)))
2572 elif (expect_success is not True
2573 and expect_success != stats['version']):
2574 raise AssertionError("version mismatch: expected %r, got %r"
2575 % (expect_success, stats['version']))
2576
2577
2578class ThreadedTests(unittest.TestCase):
2579
2580 @skip_if_broken_ubuntu_ssl
2581 def test_echo(self):
2582 """Basic test of an SSL client connecting to a server"""
2583 if support.verbose:
2584 sys.stdout.write("\n")
2585 for protocol in PROTOCOLS:
2586 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2587 continue
2588 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2589 context = ssl.SSLContext(protocol)
2590 context.load_cert_chain(CERTFILE)
2591 server_params_test(context, context,
2592 chatty=True, connectionchatty=True)
2593
Christian Heimesa170fa12017-09-15 20:27:30 +02002594 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002595
2596 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2597 server_params_test(client_context=client_context,
2598 server_context=server_context,
2599 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002600 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002601
2602 client_context.check_hostname = False
2603 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2604 with self.assertRaises(ssl.SSLError) as e:
2605 server_params_test(client_context=server_context,
2606 server_context=client_context,
2607 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002608 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002609 self.assertIn('called a function you should not call',
2610 str(e.exception))
2611
2612 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2613 with self.assertRaises(ssl.SSLError) as e:
2614 server_params_test(client_context=server_context,
2615 server_context=server_context,
2616 chatty=True, connectionchatty=True)
2617 self.assertIn('called a function you should not call',
2618 str(e.exception))
2619
2620 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2621 with self.assertRaises(ssl.SSLError) as e:
2622 server_params_test(client_context=server_context,
2623 server_context=client_context,
2624 chatty=True, connectionchatty=True)
2625 self.assertIn('called a function you should not call',
2626 str(e.exception))
2627
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002628 def test_getpeercert(self):
2629 if support.verbose:
2630 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002631
2632 client_context, server_context, hostname = testing_context()
2633 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002634 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002635 with client_context.wrap_socket(socket.socket(),
2636 do_handshake_on_connect=False,
2637 server_hostname=hostname) as s:
2638 s.connect((HOST, server.port))
2639 # getpeercert() raise ValueError while the handshake isn't
2640 # done.
2641 with self.assertRaises(ValueError):
2642 s.getpeercert()
2643 s.do_handshake()
2644 cert = s.getpeercert()
2645 self.assertTrue(cert, "Can't get peer certificate.")
2646 cipher = s.cipher()
2647 if support.verbose:
2648 sys.stdout.write(pprint.pformat(cert) + '\n')
2649 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2650 if 'subject' not in cert:
2651 self.fail("No subject field in certificate: %s." %
2652 pprint.pformat(cert))
2653 if ((('organizationName', 'Python Software Foundation'),)
2654 not in cert['subject']):
2655 self.fail(
2656 "Missing or invalid 'organizationName' field in certificate subject; "
2657 "should be 'Python Software Foundation'.")
2658 self.assertIn('notBefore', cert)
2659 self.assertIn('notAfter', cert)
2660 before = ssl.cert_time_to_seconds(cert['notBefore'])
2661 after = ssl.cert_time_to_seconds(cert['notAfter'])
2662 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002663
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002664 @unittest.skipUnless(have_verify_flags(),
2665 "verify_flags need OpenSSL > 0.9.8")
2666 def test_crl_check(self):
2667 if support.verbose:
2668 sys.stdout.write("\n")
2669
Christian Heimesa170fa12017-09-15 20:27:30 +02002670 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002671
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002673 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002674
2675 # VERIFY_DEFAULT should pass
2676 server = ThreadedEchoServer(context=server_context, chatty=True)
2677 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002678 with client_context.wrap_socket(socket.socket(),
2679 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002680 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002681 cert = s.getpeercert()
2682 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002683
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002684 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002685 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002686
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687 server = ThreadedEchoServer(context=server_context, chatty=True)
2688 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002689 with client_context.wrap_socket(socket.socket(),
2690 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002691 with self.assertRaisesRegex(ssl.SSLError,
2692 "certificate verify failed"):
2693 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002694
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002695 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002696 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002697
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 server = ThreadedEchoServer(context=server_context, chatty=True)
2699 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002700 with client_context.wrap_socket(socket.socket(),
2701 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002702 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002703 cert = s.getpeercert()
2704 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002705
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706 def test_check_hostname(self):
2707 if support.verbose:
2708 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002709
Christian Heimesa170fa12017-09-15 20:27:30 +02002710 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002711
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002712 # correct hostname should verify
2713 server = ThreadedEchoServer(context=server_context, chatty=True)
2714 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002715 with client_context.wrap_socket(socket.socket(),
2716 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002717 s.connect((HOST, server.port))
2718 cert = s.getpeercert()
2719 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002720
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002721 # incorrect hostname should raise an exception
2722 server = ThreadedEchoServer(context=server_context, chatty=True)
2723 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002724 with client_context.wrap_socket(socket.socket(),
2725 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002726 with self.assertRaisesRegex(
2727 ssl.CertificateError,
2728 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002729 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002730
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002731 # missing server_hostname arg should cause an exception, too
2732 server = ThreadedEchoServer(context=server_context, chatty=True)
2733 with server:
2734 with socket.socket() as s:
2735 with self.assertRaisesRegex(ValueError,
2736 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002737 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002738
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002739 def test_ecc_cert(self):
2740 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2741 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002742 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002743 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2744
2745 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2746 # load ECC cert
2747 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2748
2749 # correct hostname should verify
2750 server = ThreadedEchoServer(context=server_context, chatty=True)
2751 with server:
2752 with client_context.wrap_socket(socket.socket(),
2753 server_hostname=hostname) as s:
2754 s.connect((HOST, server.port))
2755 cert = s.getpeercert()
2756 self.assertTrue(cert, "Can't get peer certificate.")
2757 cipher = s.cipher()[0].split('-')
2758 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2759
2760 def test_dual_rsa_ecc(self):
2761 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2762 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002763 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2764 # algorithms.
2765 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002766 # only ECDSA certs
2767 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2768 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2769
2770 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2771 # load ECC and RSA key/cert pairs
2772 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2773 server_context.load_cert_chain(SIGNED_CERTFILE)
2774
2775 # correct hostname should verify
2776 server = ThreadedEchoServer(context=server_context, chatty=True)
2777 with server:
2778 with client_context.wrap_socket(socket.socket(),
2779 server_hostname=hostname) as s:
2780 s.connect((HOST, server.port))
2781 cert = s.getpeercert()
2782 self.assertTrue(cert, "Can't get peer certificate.")
2783 cipher = s.cipher()[0].split('-')
2784 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2785
Christian Heimes66e57422018-01-29 14:25:13 +01002786 def test_check_hostname_idn(self):
2787 if support.verbose:
2788 sys.stdout.write("\n")
2789
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002790 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002791 server_context.load_cert_chain(IDNSANSFILE)
2792
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002793 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002794 context.verify_mode = ssl.CERT_REQUIRED
2795 context.check_hostname = True
2796 context.load_verify_locations(SIGNING_CA)
2797
2798 # correct hostname should verify, when specified in several
2799 # different ways
2800 idn_hostnames = [
2801 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002802 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002803 ('xn--knig-5qa.idn.pythontest.net',
2804 'xn--knig-5qa.idn.pythontest.net'),
2805 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002806 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002807
2808 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002809 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002810 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2811 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2812 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002813 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2814
2815 # ('königsgäßchen.idna2008.pythontest.net',
2816 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2817 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2818 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2819 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2820 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2821
Christian Heimes66e57422018-01-29 14:25:13 +01002822 ]
2823 for server_hostname, expected_hostname in idn_hostnames:
2824 server = ThreadedEchoServer(context=server_context, chatty=True)
2825 with server:
2826 with context.wrap_socket(socket.socket(),
2827 server_hostname=server_hostname) as s:
2828 self.assertEqual(s.server_hostname, expected_hostname)
2829 s.connect((HOST, server.port))
2830 cert = s.getpeercert()
2831 self.assertEqual(s.server_hostname, expected_hostname)
2832 self.assertTrue(cert, "Can't get peer certificate.")
2833
Christian Heimes66e57422018-01-29 14:25:13 +01002834 # incorrect hostname should raise an exception
2835 server = ThreadedEchoServer(context=server_context, chatty=True)
2836 with server:
2837 with context.wrap_socket(socket.socket(),
2838 server_hostname="python.example.org") as s:
2839 with self.assertRaises(ssl.CertificateError):
2840 s.connect((HOST, server.port))
2841
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002842 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002843 """Connecting when the server rejects the client's certificate
2844
2845 Launch a server with CERT_REQUIRED, and check that trying to
2846 connect to it with a wrong client certificate fails.
2847 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002848 client_context, server_context, hostname = testing_context()
2849 # load client cert
2850 client_context.load_cert_chain(WRONG_CERT)
2851 # require TLS client authentication
2852 server_context.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002853 # TLS 1.3 has different handshake
2854 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002855
2856 server = ThreadedEchoServer(
2857 context=server_context, chatty=True, connectionchatty=True,
2858 )
2859
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002860 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002861 client_context.wrap_socket(socket.socket(),
2862 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002863 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002864 # Expect either an SSL error about the server rejecting
2865 # the connection, or a low-level connection reset (which
2866 # sometimes happens on Windows)
2867 s.connect((HOST, server.port))
2868 except ssl.SSLError as e:
2869 if support.verbose:
2870 sys.stdout.write("\nSSLError is %r\n" % e)
2871 except OSError as e:
2872 if e.errno != errno.ECONNRESET:
2873 raise
2874 if support.verbose:
2875 sys.stdout.write("\nsocket.error is %r\n" % e)
2876 else:
2877 self.fail("Use of invalid cert should have failed!")
2878
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002879 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2880 def test_wrong_cert_tls13(self):
2881 client_context, server_context, hostname = testing_context()
2882 client_context.load_cert_chain(WRONG_CERT)
2883 server_context.verify_mode = ssl.CERT_REQUIRED
2884 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2885 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2886
2887 server = ThreadedEchoServer(
2888 context=server_context, chatty=True, connectionchatty=True,
2889 )
2890 with server, \
2891 client_context.wrap_socket(socket.socket(),
2892 server_hostname=hostname) as s:
2893 # TLS 1.3 perform client cert exchange after handshake
2894 s.connect((HOST, server.port))
2895 try:
2896 s.write(b'data')
2897 s.read(4)
2898 except ssl.SSLError as e:
2899 if support.verbose:
2900 sys.stdout.write("\nSSLError is %r\n" % e)
2901 except OSError as e:
2902 if e.errno != errno.ECONNRESET:
2903 raise
2904 if support.verbose:
2905 sys.stdout.write("\nsocket.error is %r\n" % e)
2906 else:
2907 self.fail("Use of invalid cert should have failed!")
2908
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002909 def test_rude_shutdown(self):
2910 """A brutal shutdown of an SSL server should raise an OSError
2911 in the client when attempting handshake.
2912 """
2913 listener_ready = threading.Event()
2914 listener_gone = threading.Event()
2915
2916 s = socket.socket()
2917 port = support.bind_port(s, HOST)
2918
2919 # `listener` runs in a thread. It sits in an accept() until
2920 # the main thread connects. Then it rudely closes the socket,
2921 # and sets Event `listener_gone` to let the main thread know
2922 # the socket is gone.
2923 def listener():
2924 s.listen()
2925 listener_ready.set()
2926 newsock, addr = s.accept()
2927 newsock.close()
2928 s.close()
2929 listener_gone.set()
2930
2931 def connector():
2932 listener_ready.wait()
2933 with socket.socket() as c:
2934 c.connect((HOST, port))
2935 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002936 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 ssl_sock = test_wrap_socket(c)
2938 except OSError:
2939 pass
2940 else:
2941 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002942
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002943 t = threading.Thread(target=listener)
2944 t.start()
2945 try:
2946 connector()
2947 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002948 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002949
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002950 def test_ssl_cert_verify_error(self):
2951 if support.verbose:
2952 sys.stdout.write("\n")
2953
2954 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2955 server_context.load_cert_chain(SIGNED_CERTFILE)
2956
2957 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2958
2959 server = ThreadedEchoServer(context=server_context, chatty=True)
2960 with server:
2961 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002962 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002963 try:
2964 s.connect((HOST, server.port))
2965 except ssl.SSLError as e:
2966 msg = 'unable to get local issuer certificate'
2967 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2968 self.assertEqual(e.verify_code, 20)
2969 self.assertEqual(e.verify_message, msg)
2970 self.assertIn(msg, repr(e))
2971 self.assertIn('certificate verify failed', repr(e))
2972
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 @skip_if_broken_ubuntu_ssl
2974 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2975 "OpenSSL is compiled without SSLv2 support")
2976 def test_protocol_sslv2(self):
2977 """Connecting to an SSLv2 server with various client options"""
2978 if support.verbose:
2979 sys.stdout.write("\n")
2980 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2981 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2982 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002983 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002984 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2985 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2986 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2987 # SSLv23 client with specific SSL options
2988 if no_sslv2_implies_sslv3_hello():
2989 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002990 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002991 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002992 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002993 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002994 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002995 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002996
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002997 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002998 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002999 """Connecting to an SSLv23 server with various client options"""
3000 if support.verbose:
3001 sys.stdout.write("\n")
3002 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003003 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003004 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003005 except OSError as x:
3006 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3007 if support.verbose:
3008 sys.stdout.write(
3009 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3010 % str(x))
3011 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003012 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3013 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3014 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003015
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003016 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003017 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3018 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3019 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003020
3021 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003022 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3023 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3024 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003025
3026 # Server with specific SSL options
3027 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003028 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003029 server_options=ssl.OP_NO_SSLv3)
3030 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003031 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003032 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003033 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003034 server_options=ssl.OP_NO_TLSv1)
3035
3036
3037 @skip_if_broken_ubuntu_ssl
3038 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3039 "OpenSSL is compiled without SSLv3 support")
3040 def test_protocol_sslv3(self):
3041 """Connecting to an SSLv3 server with various client options"""
3042 if support.verbose:
3043 sys.stdout.write("\n")
3044 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3045 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3046 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3047 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3048 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003049 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003050 client_options=ssl.OP_NO_SSLv3)
3051 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3052 if no_sslv2_implies_sslv3_hello():
3053 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003054 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003055 False, client_options=ssl.OP_NO_SSLv2)
3056
3057 @skip_if_broken_ubuntu_ssl
3058 def test_protocol_tlsv1(self):
3059 """Connecting to a TLSv1 server with various client options"""
3060 if support.verbose:
3061 sys.stdout.write("\n")
3062 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3063 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3064 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3065 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3066 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3067 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3068 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003069 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070 client_options=ssl.OP_NO_TLSv1)
3071
3072 @skip_if_broken_ubuntu_ssl
3073 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3074 "TLS version 1.1 not supported.")
3075 def test_protocol_tlsv1_1(self):
3076 """Connecting to a TLSv1.1 server with various client options.
3077 Testing against older TLS versions."""
3078 if support.verbose:
3079 sys.stdout.write("\n")
3080 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3081 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3082 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3083 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3084 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003085 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 client_options=ssl.OP_NO_TLSv1_1)
3087
Christian Heimesa170fa12017-09-15 20:27:30 +02003088 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3090 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3091
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003092 @skip_if_broken_ubuntu_ssl
3093 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3094 "TLS version 1.2 not supported.")
3095 def test_protocol_tlsv1_2(self):
3096 """Connecting to a TLSv1.2 server with various client options.
3097 Testing against older TLS versions."""
3098 if support.verbose:
3099 sys.stdout.write("\n")
3100 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3101 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3102 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3103 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3104 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3105 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3106 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003107 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003108 client_options=ssl.OP_NO_TLSv1_2)
3109
Christian Heimesa170fa12017-09-15 20:27:30 +02003110 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003111 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3112 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3113 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3114 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3115
3116 def test_starttls(self):
3117 """Switching from clear text to encrypted and back again."""
3118 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3119
3120 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003121 starttls_server=True,
3122 chatty=True,
3123 connectionchatty=True)
3124 wrapped = False
3125 with server:
3126 s = socket.socket()
3127 s.setblocking(1)
3128 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003129 if support.verbose:
3130 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003131 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003132 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003133 sys.stdout.write(
3134 " client: sending %r...\n" % indata)
3135 if wrapped:
3136 conn.write(indata)
3137 outdata = conn.read()
3138 else:
3139 s.send(indata)
3140 outdata = s.recv(1024)
3141 msg = outdata.strip().lower()
3142 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3143 # STARTTLS ok, switch to secure mode
3144 if support.verbose:
3145 sys.stdout.write(
3146 " client: read %r from server, starting TLS...\n"
3147 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003148 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003149 wrapped = True
3150 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3151 # ENDTLS ok, switch back to clear text
3152 if support.verbose:
3153 sys.stdout.write(
3154 " client: read %r from server, ending TLS...\n"
3155 % msg)
3156 s = conn.unwrap()
3157 wrapped = False
3158 else:
3159 if support.verbose:
3160 sys.stdout.write(
3161 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003162 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003163 sys.stdout.write(" client: closing connection.\n")
3164 if wrapped:
3165 conn.write(b"over\n")
3166 else:
3167 s.send(b"over\n")
3168 if wrapped:
3169 conn.close()
3170 else:
3171 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003173 def test_socketserver(self):
3174 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003175 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003176 # try to connect
3177 if support.verbose:
3178 sys.stdout.write('\n')
3179 with open(CERTFILE, 'rb') as f:
3180 d1 = f.read()
3181 d2 = ''
3182 # now fetch the same data from the HTTPS server
3183 url = 'https://localhost:%d/%s' % (
3184 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003185 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003186 f = urllib.request.urlopen(url, context=context)
3187 try:
3188 dlen = f.info().get("content-length")
3189 if dlen and (int(dlen) > 0):
3190 d2 = f.read(int(dlen))
3191 if support.verbose:
3192 sys.stdout.write(
3193 " client: read %d bytes from remote server '%s'\n"
3194 % (len(d2), server))
3195 finally:
3196 f.close()
3197 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003198
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199 def test_asyncore_server(self):
3200 """Check the example asyncore integration."""
3201 if support.verbose:
3202 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003203
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003204 indata = b"FOO\n"
3205 server = AsyncoreEchoServer(CERTFILE)
3206 with server:
3207 s = test_wrap_socket(socket.socket())
3208 s.connect(('127.0.0.1', server.port))
3209 if support.verbose:
3210 sys.stdout.write(
3211 " client: sending %r...\n" % indata)
3212 s.write(indata)
3213 outdata = s.read()
3214 if support.verbose:
3215 sys.stdout.write(" client: read %r\n" % outdata)
3216 if outdata != indata.lower():
3217 self.fail(
3218 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3219 % (outdata[:20], len(outdata),
3220 indata[:20].lower(), len(indata)))
3221 s.write(b"over\n")
3222 if support.verbose:
3223 sys.stdout.write(" client: closing connection.\n")
3224 s.close()
3225 if support.verbose:
3226 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003227
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003228 def test_recv_send(self):
3229 """Test recv(), send() and friends."""
3230 if support.verbose:
3231 sys.stdout.write("\n")
3232
3233 server = ThreadedEchoServer(CERTFILE,
3234 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003235 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003236 cacerts=CERTFILE,
3237 chatty=True,
3238 connectionchatty=False)
3239 with server:
3240 s = test_wrap_socket(socket.socket(),
3241 server_side=False,
3242 certfile=CERTFILE,
3243 ca_certs=CERTFILE,
3244 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003245 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003246 s.connect((HOST, server.port))
3247 # helper methods for standardising recv* method signatures
3248 def _recv_into():
3249 b = bytearray(b"\0"*100)
3250 count = s.recv_into(b)
3251 return b[:count]
3252
3253 def _recvfrom_into():
3254 b = bytearray(b"\0"*100)
3255 count, addr = s.recvfrom_into(b)
3256 return b[:count]
3257
3258 # (name, method, expect success?, *args, return value func)
3259 send_methods = [
3260 ('send', s.send, True, [], len),
3261 ('sendto', s.sendto, False, ["some.address"], len),
3262 ('sendall', s.sendall, True, [], lambda x: None),
3263 ]
3264 # (name, method, whether to expect success, *args)
3265 recv_methods = [
3266 ('recv', s.recv, True, []),
3267 ('recvfrom', s.recvfrom, False, ["some.address"]),
3268 ('recv_into', _recv_into, True, []),
3269 ('recvfrom_into', _recvfrom_into, False, []),
3270 ]
3271 data_prefix = "PREFIX_"
3272
3273 for (meth_name, send_meth, expect_success, args,
3274 ret_val_meth) in send_methods:
3275 indata = (data_prefix + meth_name).encode('ascii')
3276 try:
3277 ret = send_meth(indata, *args)
3278 msg = "sending with {}".format(meth_name)
3279 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3280 outdata = s.read()
3281 if outdata != indata.lower():
3282 self.fail(
3283 "While sending with <<{name:s}>> bad data "
3284 "<<{outdata:r}>> ({nout:d}) received; "
3285 "expected <<{indata:r}>> ({nin:d})\n".format(
3286 name=meth_name, outdata=outdata[:20],
3287 nout=len(outdata),
3288 indata=indata[:20], nin=len(indata)
3289 )
3290 )
3291 except ValueError as e:
3292 if expect_success:
3293 self.fail(
3294 "Failed to send with method <<{name:s}>>; "
3295 "expected to succeed.\n".format(name=meth_name)
3296 )
3297 if not str(e).startswith(meth_name):
3298 self.fail(
3299 "Method <<{name:s}>> failed with unexpected "
3300 "exception message: {exp:s}\n".format(
3301 name=meth_name, exp=e
3302 )
3303 )
3304
3305 for meth_name, recv_meth, expect_success, args in recv_methods:
3306 indata = (data_prefix + meth_name).encode('ascii')
3307 try:
3308 s.send(indata)
3309 outdata = recv_meth(*args)
3310 if outdata != indata.lower():
3311 self.fail(
3312 "While receiving with <<{name:s}>> bad data "
3313 "<<{outdata:r}>> ({nout:d}) received; "
3314 "expected <<{indata:r}>> ({nin:d})\n".format(
3315 name=meth_name, outdata=outdata[:20],
3316 nout=len(outdata),
3317 indata=indata[:20], nin=len(indata)
3318 )
3319 )
3320 except ValueError as e:
3321 if expect_success:
3322 self.fail(
3323 "Failed to receive with method <<{name:s}>>; "
3324 "expected to succeed.\n".format(name=meth_name)
3325 )
3326 if not str(e).startswith(meth_name):
3327 self.fail(
3328 "Method <<{name:s}>> failed with unexpected "
3329 "exception message: {exp:s}\n".format(
3330 name=meth_name, exp=e
3331 )
3332 )
3333 # consume data
3334 s.read()
3335
3336 # read(-1, buffer) is supported, even though read(-1) is not
3337 data = b"data"
3338 s.send(data)
3339 buffer = bytearray(len(data))
3340 self.assertEqual(s.read(-1, buffer), len(data))
3341 self.assertEqual(buffer, data)
3342
Christian Heimes888bbdc2017-09-07 14:18:21 -07003343 # sendall accepts bytes-like objects
3344 if ctypes is not None:
3345 ubyte = ctypes.c_ubyte * len(data)
3346 byteslike = ubyte.from_buffer_copy(data)
3347 s.sendall(byteslike)
3348 self.assertEqual(s.read(), data)
3349
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003350 # Make sure sendmsg et al are disallowed to avoid
3351 # inadvertent disclosure of data and/or corruption
3352 # of the encrypted data stream
3353 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3354 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3355 self.assertRaises(NotImplementedError,
3356 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003357 s.write(b"over\n")
3358
3359 self.assertRaises(ValueError, s.recv, -1)
3360 self.assertRaises(ValueError, s.read, -1)
3361
3362 s.close()
3363
3364 def test_recv_zero(self):
3365 server = ThreadedEchoServer(CERTFILE)
3366 server.__enter__()
3367 self.addCleanup(server.__exit__, None, None)
3368 s = socket.create_connection((HOST, server.port))
3369 self.addCleanup(s.close)
3370 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3371 self.addCleanup(s.close)
3372
3373 # recv/read(0) should return no data
3374 s.send(b"data")
3375 self.assertEqual(s.recv(0), b"")
3376 self.assertEqual(s.read(0), b"")
3377 self.assertEqual(s.read(), b"data")
3378
3379 # Should not block if the other end sends no data
3380 s.setblocking(False)
3381 self.assertEqual(s.recv(0), b"")
3382 self.assertEqual(s.recv_into(bytearray()), 0)
3383
3384 def test_nonblocking_send(self):
3385 server = ThreadedEchoServer(CERTFILE,
3386 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003387 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003388 cacerts=CERTFILE,
3389 chatty=True,
3390 connectionchatty=False)
3391 with server:
3392 s = test_wrap_socket(socket.socket(),
3393 server_side=False,
3394 certfile=CERTFILE,
3395 ca_certs=CERTFILE,
3396 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003397 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003398 s.connect((HOST, server.port))
3399 s.setblocking(False)
3400
3401 # If we keep sending data, at some point the buffers
3402 # will be full and the call will block
3403 buf = bytearray(8192)
3404 def fill_buffer():
3405 while True:
3406 s.send(buf)
3407 self.assertRaises((ssl.SSLWantWriteError,
3408 ssl.SSLWantReadError), fill_buffer)
3409
3410 # Now read all the output and discard it
3411 s.setblocking(True)
3412 s.close()
3413
3414 def test_handshake_timeout(self):
3415 # Issue #5103: SSL handshake must respect the socket timeout
3416 server = socket.socket(socket.AF_INET)
3417 host = "127.0.0.1"
3418 port = support.bind_port(server)
3419 started = threading.Event()
3420 finish = False
3421
3422 def serve():
3423 server.listen()
3424 started.set()
3425 conns = []
3426 while not finish:
3427 r, w, e = select.select([server], [], [], 0.1)
3428 if server in r:
3429 # Let the socket hang around rather than having
3430 # it closed by garbage collection.
3431 conns.append(server.accept()[0])
3432 for sock in conns:
3433 sock.close()
3434
3435 t = threading.Thread(target=serve)
3436 t.start()
3437 started.wait()
3438
3439 try:
3440 try:
3441 c = socket.socket(socket.AF_INET)
3442 c.settimeout(0.2)
3443 c.connect((host, port))
3444 # Will attempt handshake and time out
3445 self.assertRaisesRegex(socket.timeout, "timed out",
3446 test_wrap_socket, c)
3447 finally:
3448 c.close()
3449 try:
3450 c = socket.socket(socket.AF_INET)
3451 c = test_wrap_socket(c)
3452 c.settimeout(0.2)
3453 # Will attempt handshake and time out
3454 self.assertRaisesRegex(socket.timeout, "timed out",
3455 c.connect, (host, port))
3456 finally:
3457 c.close()
3458 finally:
3459 finish = True
3460 t.join()
3461 server.close()
3462
3463 def test_server_accept(self):
3464 # Issue #16357: accept() on a SSLSocket created through
3465 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003466 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003467 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003468 context.load_verify_locations(SIGNING_CA)
3469 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003470 server = socket.socket(socket.AF_INET)
3471 host = "127.0.0.1"
3472 port = support.bind_port(server)
3473 server = context.wrap_socket(server, server_side=True)
3474 self.assertTrue(server.server_side)
3475
3476 evt = threading.Event()
3477 remote = None
3478 peer = None
3479 def serve():
3480 nonlocal remote, peer
3481 server.listen()
3482 # Block on the accept and wait on the connection to close.
3483 evt.set()
3484 remote, peer = server.accept()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003485 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003486
3487 t = threading.Thread(target=serve)
3488 t.start()
3489 # Client wait until server setup and perform a connect.
3490 evt.wait()
3491 client = context.wrap_socket(socket.socket())
3492 client.connect((host, port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003493 client.send(b'data')
3494 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003495 client_addr = client.getsockname()
3496 client.close()
3497 t.join()
3498 remote.close()
3499 server.close()
3500 # Sanity checks.
3501 self.assertIsInstance(remote, ssl.SSLSocket)
3502 self.assertEqual(peer, client_addr)
3503
3504 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003505 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003506 with context.wrap_socket(socket.socket()) as sock:
3507 with self.assertRaises(OSError) as cm:
3508 sock.getpeercert()
3509 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3510
3511 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003512 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003513 with context.wrap_socket(socket.socket()) as sock:
3514 with self.assertRaises(OSError) as cm:
3515 sock.do_handshake()
3516 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3517
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003518 def test_no_shared_ciphers(self):
3519 client_context, server_context, hostname = testing_context()
3520 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3521 client_context.options |= ssl.OP_NO_TLSv1_3
3522 # Force different suites on client and master
3523 client_context.set_ciphers("AES128")
3524 server_context.set_ciphers("AES256")
3525 with ThreadedEchoServer(context=server_context) as server:
3526 with client_context.wrap_socket(socket.socket(),
3527 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003528 with self.assertRaises(OSError):
3529 s.connect((HOST, server.port))
3530 self.assertIn("no shared cipher", server.conn_errors[0])
3531
3532 def test_version_basic(self):
3533 """
3534 Basic tests for SSLSocket.version().
3535 More tests are done in the test_protocol_*() methods.
3536 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003537 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3538 context.check_hostname = False
3539 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003540 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003541 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003542 chatty=False) as server:
3543 with context.wrap_socket(socket.socket()) as s:
3544 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003545 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003546 s.connect((HOST, server.port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003547 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003548 self.assertEqual(s.version(), 'TLSv1.3')
3549 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003550 self.assertEqual(s.version(), 'TLSv1.2')
3551 else: # 0.9.8 to 1.0.1
3552 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003553 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003554 self.assertIs(s.version(), None)
3555
Christian Heimescb5b68a2017-09-07 18:07:00 -07003556 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3557 "test requires TLSv1.3 enabled OpenSSL")
3558 def test_tls1_3(self):
3559 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3560 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003561 context.options |= (
3562 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3563 )
3564 with ThreadedEchoServer(context=context) as server:
3565 with context.wrap_socket(socket.socket()) as s:
3566 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003567 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003568 'TLS_AES_256_GCM_SHA384',
3569 'TLS_CHACHA20_POLY1305_SHA256',
3570 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003571 })
3572 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003573
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003574 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3575 "required OpenSSL 1.1.0g")
3576 def test_min_max_version(self):
3577 client_context, server_context, hostname = testing_context()
3578 # client TLSv1.0 to 1.2
3579 client_context.minimum_version = ssl.TLSVersion.TLSv1
3580 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3581 # server only TLSv1.2
3582 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3583 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3584
3585 with ThreadedEchoServer(context=server_context) as server:
3586 with client_context.wrap_socket(socket.socket(),
3587 server_hostname=hostname) as s:
3588 s.connect((HOST, server.port))
3589 self.assertEqual(s.version(), 'TLSv1.2')
3590
3591 # client 1.0 to 1.2, server 1.0 to 1.1
3592 server_context.minimum_version = ssl.TLSVersion.TLSv1
3593 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3594
3595 with ThreadedEchoServer(context=server_context) as server:
3596 with client_context.wrap_socket(socket.socket(),
3597 server_hostname=hostname) as s:
3598 s.connect((HOST, server.port))
3599 self.assertEqual(s.version(), 'TLSv1.1')
3600
3601 # client 1.0, server 1.2 (mismatch)
3602 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3603 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3604 client_context.minimum_version = ssl.TLSVersion.TLSv1
3605 client_context.maximum_version = ssl.TLSVersion.TLSv1
3606 with ThreadedEchoServer(context=server_context) as server:
3607 with client_context.wrap_socket(socket.socket(),
3608 server_hostname=hostname) as s:
3609 with self.assertRaises(ssl.SSLError) as e:
3610 s.connect((HOST, server.port))
3611 self.assertIn("alert", str(e.exception))
3612
3613
3614 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3615 "required OpenSSL 1.1.0g")
3616 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3617 def test_min_max_version_sslv3(self):
3618 client_context, server_context, hostname = testing_context()
3619 server_context.minimum_version = ssl.TLSVersion.SSLv3
3620 client_context.minimum_version = ssl.TLSVersion.SSLv3
3621 client_context.maximum_version = ssl.TLSVersion.SSLv3
3622 with ThreadedEchoServer(context=server_context) as server:
3623 with client_context.wrap_socket(socket.socket(),
3624 server_hostname=hostname) as s:
3625 s.connect((HOST, server.port))
3626 self.assertEqual(s.version(), 'SSLv3')
3627
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003628 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3629 def test_default_ecdh_curve(self):
3630 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3631 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003632 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003633 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003634 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3635 # cipher name.
3636 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003637 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3638 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3639 # our default cipher list should prefer ECDH-based ciphers
3640 # automatically.
3641 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3642 context.set_ciphers("ECCdraft:ECDH")
3643 with ThreadedEchoServer(context=context) as server:
3644 with context.wrap_socket(socket.socket()) as s:
3645 s.connect((HOST, server.port))
3646 self.assertIn("ECDH", s.cipher()[0])
3647
3648 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3649 "'tls-unique' channel binding not available")
3650 def test_tls_unique_channel_binding(self):
3651 """Test tls-unique channel binding."""
3652 if support.verbose:
3653 sys.stdout.write("\n")
3654
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003655 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003656
3657 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003658 chatty=True,
3659 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003660
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003661 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003662 with client_context.wrap_socket(
3663 socket.socket(),
3664 server_hostname=hostname) as s:
3665 s.connect((HOST, server.port))
3666 # get the data
3667 cb_data = s.get_channel_binding("tls-unique")
3668 if support.verbose:
3669 sys.stdout.write(
3670 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003671
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003672 # check if it is sane
3673 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003674 if s.version() == 'TLSv1.3':
3675 self.assertEqual(len(cb_data), 48)
3676 else:
3677 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003678
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003679 # and compare with the peers version
3680 s.write(b"CB tls-unique\n")
3681 peer_data_repr = s.read().strip()
3682 self.assertEqual(peer_data_repr,
3683 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003684
3685 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003686 with client_context.wrap_socket(
3687 socket.socket(),
3688 server_hostname=hostname) as s:
3689 s.connect((HOST, server.port))
3690 new_cb_data = s.get_channel_binding("tls-unique")
3691 if support.verbose:
3692 sys.stdout.write(
3693 "got another channel binding data: {0!r}\n".format(
3694 new_cb_data)
3695 )
3696 # is it really unique
3697 self.assertNotEqual(cb_data, new_cb_data)
3698 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003699 if s.version() == 'TLSv1.3':
3700 self.assertEqual(len(cb_data), 48)
3701 else:
3702 self.assertEqual(len(cb_data), 12) # True for TLSv1
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003703 s.write(b"CB tls-unique\n")
3704 peer_data_repr = s.read().strip()
3705 self.assertEqual(peer_data_repr,
3706 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003707
3708 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003709 client_context, server_context, hostname = testing_context()
3710 stats = server_params_test(client_context, server_context,
3711 chatty=True, connectionchatty=True,
3712 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003713 if support.verbose:
3714 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3715 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3716
3717 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3718 "ssl.OP_NO_COMPRESSION needed for this test")
3719 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003720 client_context, server_context, hostname = testing_context()
3721 client_context.options |= ssl.OP_NO_COMPRESSION
3722 server_context.options |= ssl.OP_NO_COMPRESSION
3723 stats = server_params_test(client_context, server_context,
3724 chatty=True, connectionchatty=True,
3725 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003726 self.assertIs(stats['compression'], None)
3727
3728 def test_dh_params(self):
3729 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003730 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003731 # test scenario needs TLS <= 1.2
3732 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003733 server_context.load_dh_params(DHFILE)
3734 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003735 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003736 stats = server_params_test(client_context, server_context,
3737 chatty=True, connectionchatty=True,
3738 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003739 cipher = stats["cipher"][0]
3740 parts = cipher.split("-")
3741 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3742 self.fail("Non-DH cipher: " + cipher[0])
3743
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003744 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003745 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003746 def test_ecdh_curve(self):
3747 # server secp384r1, client auto
3748 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003749
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003750 server_context.set_ecdh_curve("secp384r1")
3751 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3752 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3753 stats = server_params_test(client_context, server_context,
3754 chatty=True, connectionchatty=True,
3755 sni_name=hostname)
3756
3757 # server auto, client secp384r1
3758 client_context, server_context, hostname = testing_context()
3759 client_context.set_ecdh_curve("secp384r1")
3760 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3761 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3762 stats = server_params_test(client_context, server_context,
3763 chatty=True, connectionchatty=True,
3764 sni_name=hostname)
3765
3766 # server / client curve mismatch
3767 client_context, server_context, hostname = testing_context()
3768 client_context.set_ecdh_curve("prime256v1")
3769 server_context.set_ecdh_curve("secp384r1")
3770 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3771 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3772 try:
3773 stats = server_params_test(client_context, server_context,
3774 chatty=True, connectionchatty=True,
3775 sni_name=hostname)
3776 except ssl.SSLError:
3777 pass
3778 else:
3779 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003780 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003781 self.fail("mismatch curve did not fail")
3782
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003783 def test_selected_alpn_protocol(self):
3784 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003785 client_context, server_context, hostname = testing_context()
3786 stats = server_params_test(client_context, server_context,
3787 chatty=True, connectionchatty=True,
3788 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003789 self.assertIs(stats['client_alpn_protocol'], None)
3790
3791 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3792 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3793 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003794 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003795 server_context.set_alpn_protocols(['foo', 'bar'])
3796 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003797 chatty=True, connectionchatty=True,
3798 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003799 self.assertIs(stats['client_alpn_protocol'], None)
3800
3801 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3802 def test_alpn_protocols(self):
3803 server_protocols = ['foo', 'bar', 'milkshake']
3804 protocol_tests = [
3805 (['foo', 'bar'], 'foo'),
3806 (['bar', 'foo'], 'foo'),
3807 (['milkshake'], 'milkshake'),
3808 (['http/3.0', 'http/4.0'], None)
3809 ]
3810 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003811 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003812 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003813 client_context.set_alpn_protocols(client_protocols)
3814
3815 try:
3816 stats = server_params_test(client_context,
3817 server_context,
3818 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003819 connectionchatty=True,
3820 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003821 except ssl.SSLError as e:
3822 stats = e
3823
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003824 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003825 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3826 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3827 self.assertIsInstance(stats, ssl.SSLError)
3828 else:
3829 msg = "failed trying %s (s) and %s (c).\n" \
3830 "was expecting %s, but got %%s from the %%s" \
3831 % (str(server_protocols), str(client_protocols),
3832 str(expected))
3833 client_result = stats['client_alpn_protocol']
3834 self.assertEqual(client_result, expected,
3835 msg % (client_result, "client"))
3836 server_result = stats['server_alpn_protocols'][-1] \
3837 if len(stats['server_alpn_protocols']) else 'nothing'
3838 self.assertEqual(server_result, expected,
3839 msg % (server_result, "server"))
3840
3841 def test_selected_npn_protocol(self):
3842 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003843 client_context, server_context, hostname = testing_context()
3844 stats = server_params_test(client_context, server_context,
3845 chatty=True, connectionchatty=True,
3846 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003847 self.assertIs(stats['client_npn_protocol'], None)
3848
3849 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3850 def test_npn_protocols(self):
3851 server_protocols = ['http/1.1', 'spdy/2']
3852 protocol_tests = [
3853 (['http/1.1', 'spdy/2'], 'http/1.1'),
3854 (['spdy/2', 'http/1.1'], 'http/1.1'),
3855 (['spdy/2', 'test'], 'spdy/2'),
3856 (['abc', 'def'], 'abc')
3857 ]
3858 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003859 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003860 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003861 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003862 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003863 chatty=True, connectionchatty=True,
3864 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003865 msg = "failed trying %s (s) and %s (c).\n" \
3866 "was expecting %s, but got %%s from the %%s" \
3867 % (str(server_protocols), str(client_protocols),
3868 str(expected))
3869 client_result = stats['client_npn_protocol']
3870 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3871 server_result = stats['server_npn_protocols'][-1] \
3872 if len(stats['server_npn_protocols']) else 'nothing'
3873 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3874
3875 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003876 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003877 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003878 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003879 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003880 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003881 client_context.load_verify_locations(SIGNING_CA)
3882 return server_context, other_context, client_context
3883
3884 def check_common_name(self, stats, name):
3885 cert = stats['peercert']
3886 self.assertIn((('commonName', name),), cert['subject'])
3887
3888 @needs_sni
3889 def test_sni_callback(self):
3890 calls = []
3891 server_context, other_context, client_context = self.sni_contexts()
3892
Christian Heimesa170fa12017-09-15 20:27:30 +02003893 client_context.check_hostname = False
3894
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003895 def servername_cb(ssl_sock, server_name, initial_context):
3896 calls.append((server_name, initial_context))
3897 if server_name is not None:
3898 ssl_sock.context = other_context
3899 server_context.set_servername_callback(servername_cb)
3900
3901 stats = server_params_test(client_context, server_context,
3902 chatty=True,
3903 sni_name='supermessage')
3904 # The hostname was fetched properly, and the certificate was
3905 # changed for the connection.
3906 self.assertEqual(calls, [("supermessage", server_context)])
3907 # CERTFILE4 was selected
3908 self.check_common_name(stats, 'fakehostname')
3909
3910 calls = []
3911 # The callback is called with server_name=None
3912 stats = server_params_test(client_context, server_context,
3913 chatty=True,
3914 sni_name=None)
3915 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003916 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003917
3918 # Check disabling the callback
3919 calls = []
3920 server_context.set_servername_callback(None)
3921
3922 stats = server_params_test(client_context, server_context,
3923 chatty=True,
3924 sni_name='notfunny')
3925 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003926 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 self.assertEqual(calls, [])
3928
3929 @needs_sni
3930 def test_sni_callback_alert(self):
3931 # Returning a TLS alert is reflected to the connecting client
3932 server_context, other_context, client_context = self.sni_contexts()
3933
3934 def cb_returning_alert(ssl_sock, server_name, initial_context):
3935 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3936 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003937 with self.assertRaises(ssl.SSLError) as cm:
3938 stats = server_params_test(client_context, server_context,
3939 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003940 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003942
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003943 @needs_sni
3944 def test_sni_callback_raising(self):
3945 # Raising fails the connection with a TLS handshake failure alert.
3946 server_context, other_context, client_context = self.sni_contexts()
3947
3948 def cb_raising(ssl_sock, server_name, initial_context):
3949 1/0
3950 server_context.set_servername_callback(cb_raising)
3951
3952 with self.assertRaises(ssl.SSLError) as cm, \
3953 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003954 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003955 chatty=False,
3956 sni_name='supermessage')
3957 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3958 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003959
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960 @needs_sni
3961 def test_sni_callback_wrong_return_type(self):
3962 # Returning the wrong return type terminates the TLS connection
3963 # with an internal error alert.
3964 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003965
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003966 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3967 return "foo"
3968 server_context.set_servername_callback(cb_wrong_return_type)
3969
3970 with self.assertRaises(ssl.SSLError) as cm, \
3971 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003972 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003973 chatty=False,
3974 sni_name='supermessage')
3975 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3976 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003977
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003978 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003979 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003980 client_context.set_ciphers("AES128:AES256")
3981 server_context.set_ciphers("AES256")
3982 expected_algs = [
3983 "AES256", "AES-256",
3984 # TLS 1.3 ciphers are always enabled
3985 "TLS_CHACHA20", "TLS_AES",
3986 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003987
Christian Heimesa170fa12017-09-15 20:27:30 +02003988 stats = server_params_test(client_context, server_context,
3989 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003990 ciphers = stats['server_shared_ciphers'][0]
3991 self.assertGreater(len(ciphers), 0)
3992 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003993 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003994 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003995
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003996 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003997 client_context, server_context, hostname = testing_context()
3998 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003999
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004000 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004001 s = client_context.wrap_socket(socket.socket(),
4002 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004003 s.connect((HOST, server.port))
4004 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004005
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004006 self.assertRaises(ValueError, s.read, 1024)
4007 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004008
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004009 def test_sendfile(self):
4010 TEST_DATA = b"x" * 512
4011 with open(support.TESTFN, 'wb') as f:
4012 f.write(TEST_DATA)
4013 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004014 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004015 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004016 context.load_verify_locations(SIGNING_CA)
4017 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004018 server = ThreadedEchoServer(context=context, chatty=False)
4019 with server:
4020 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004021 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 with open(support.TESTFN, 'rb') as file:
4023 s.sendfile(file)
4024 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004025
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004026 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004027 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004028 # TODO: sessions aren't compatible with TLSv1.3 yet
4029 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004030
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004031 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004032 stats = server_params_test(client_context, server_context,
4033 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004034 session = stats['session']
4035 self.assertTrue(session.id)
4036 self.assertGreater(session.time, 0)
4037 self.assertGreater(session.timeout, 0)
4038 self.assertTrue(session.has_ticket)
4039 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4040 self.assertGreater(session.ticket_lifetime_hint, 0)
4041 self.assertFalse(stats['session_reused'])
4042 sess_stat = server_context.session_stats()
4043 self.assertEqual(sess_stat['accept'], 1)
4044 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004045
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004046 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004047 stats = server_params_test(client_context, server_context,
4048 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004049 sess_stat = server_context.session_stats()
4050 self.assertEqual(sess_stat['accept'], 2)
4051 self.assertEqual(sess_stat['hits'], 1)
4052 self.assertTrue(stats['session_reused'])
4053 session2 = stats['session']
4054 self.assertEqual(session2.id, session.id)
4055 self.assertEqual(session2, session)
4056 self.assertIsNot(session2, session)
4057 self.assertGreaterEqual(session2.time, session.time)
4058 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004059
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004060 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004061 stats = server_params_test(client_context, server_context,
4062 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 self.assertFalse(stats['session_reused'])
4064 session3 = stats['session']
4065 self.assertNotEqual(session3.id, session.id)
4066 self.assertNotEqual(session3, session)
4067 sess_stat = server_context.session_stats()
4068 self.assertEqual(sess_stat['accept'], 3)
4069 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004070
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004072 stats = server_params_test(client_context, server_context,
4073 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004074 self.assertTrue(stats['session_reused'])
4075 session4 = stats['session']
4076 self.assertEqual(session4.id, session.id)
4077 self.assertEqual(session4, session)
4078 self.assertGreaterEqual(session4.time, session.time)
4079 self.assertGreaterEqual(session4.timeout, session.timeout)
4080 sess_stat = server_context.session_stats()
4081 self.assertEqual(sess_stat['accept'], 4)
4082 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004083
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004084 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004085 client_context, server_context, hostname = testing_context()
4086 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004087
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004088 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004089 client_context.options |= ssl.OP_NO_TLSv1_3
4090 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004091
Christian Heimesa170fa12017-09-15 20:27:30 +02004092 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004094 with client_context.wrap_socket(socket.socket(),
4095 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 # session is None before handshake
4097 self.assertEqual(s.session, None)
4098 self.assertEqual(s.session_reused, None)
4099 s.connect((HOST, server.port))
4100 session = s.session
4101 self.assertTrue(session)
4102 with self.assertRaises(TypeError) as e:
4103 s.session = object
Miss Islington (bot)42198572018-06-11 17:58:06 -07004104 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004105
Christian Heimesa170fa12017-09-15 20:27:30 +02004106 with client_context.wrap_socket(socket.socket(),
4107 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004108 s.connect((HOST, server.port))
4109 # cannot set session after handshake
4110 with self.assertRaises(ValueError) as e:
4111 s.session = session
4112 self.assertEqual(str(e.exception),
4113 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004114
Christian Heimesa170fa12017-09-15 20:27:30 +02004115 with client_context.wrap_socket(socket.socket(),
4116 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004117 # can set session before handshake and before the
4118 # connection was established
4119 s.session = session
4120 s.connect((HOST, server.port))
4121 self.assertEqual(s.session.id, session.id)
4122 self.assertEqual(s.session, session)
4123 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004124
Christian Heimesa170fa12017-09-15 20:27:30 +02004125 with client_context2.wrap_socket(socket.socket(),
4126 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004127 # cannot re-use session with a different SSLContext
4128 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004129 s.session = session
4130 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 self.assertEqual(str(e.exception),
4132 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004133
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004134
Thomas Woutersed03b412007-08-28 21:37:11 +00004135def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004136 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004137 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004138 plats = {
4139 'Linux': platform.linux_distribution,
4140 'Mac': platform.mac_ver,
4141 'Windows': platform.win32_ver,
4142 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004143 with warnings.catch_warnings():
4144 warnings.filterwarnings(
4145 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004146 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004147 'functions are deprecated .*',
Xtreakf5770f32018-07-05 22:29:46 +05304148 DeprecationWarning,
Berker Peksag9e7990a2015-05-16 23:21:26 +03004149 )
4150 for name, func in plats.items():
4151 plat = func()
4152 if plat and plat[0]:
4153 plat = '%s %r' % (name, plat)
4154 break
4155 else:
4156 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004157 print("test_ssl: testing with %r %r" %
4158 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4159 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004160 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004161 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4162 try:
4163 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4164 except AttributeError:
4165 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004166
Antoine Pitrou152efa22010-05-16 18:19:27 +00004167 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004168 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004169 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004170 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004171 BADCERT, BADKEY, EMPTYCERT]:
4172 if not os.path.exists(filename):
4173 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004174
Martin Panter3840b2a2016-03-27 01:53:46 +00004175 tests = [
4176 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004177 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004178 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004179
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004180 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004181 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004182
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004183 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004184 try:
4185 support.run_unittest(*tests)
4186 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004187 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004188
4189if __name__ == "__main__":
4190 test_main()