blob: 8d98b805b49a66cf1e2c037958c6e729e43c9d99 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080058WRONG_CERT = data_file("wrongcert.pem")
Christian Heimesefff7062013-11-21 03:35:02 +010059
Christian Heimesbd5c7d22018-01-20 15:16:30 +010060CERTFILE_INFO = {
61 'issuer': ((('countryName', 'XY'),),
62 (('localityName', 'Castle Anthrax'),),
63 (('organizationName', 'Python Software Foundation'),),
64 (('commonName', 'localhost'),)),
65 'notAfter': 'Jan 17 19:09:06 2028 GMT',
66 'notBefore': 'Jan 19 19:09:06 2018 GMT',
67 'serialNumber': 'F9BA076D5B6ABD9B',
68 'subject': ((('countryName', 'XY'),),
69 (('localityName', 'Castle Anthrax'),),
70 (('organizationName', 'Python Software Foundation'),),
71 (('commonName', 'localhost'),)),
72 'subjectAltName': (('DNS', 'localhost'),),
73 'version': 3
74}
Antoine Pitrou152efa22010-05-16 18:19:27 +000075
Christian Heimes22587792013-11-21 23:56:13 +010076# empty CRL
77CRLFILE = data_file("revocation.crl")
78
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010079# Two keys and certs signed by the same CA (for SNI tests)
80SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020081SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082
83SIGNED_CERTFILE_INFO = {
84 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
85 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
86 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
87 'issuer': ((('countryName', 'XY'),),
88 (('organizationName', 'Python Software Foundation CA'),),
89 (('commonName', 'our-ca-server'),)),
90 'notAfter': 'Nov 28 19:09:06 2027 GMT',
91 'notBefore': 'Jan 19 19:09:06 2018 GMT',
92 'serialNumber': '82EDBF41C880919C',
93 'subject': ((('countryName', 'XY'),),
94 (('localityName', 'Castle Anthrax'),),
95 (('organizationName', 'Python Software Foundation'),),
96 (('commonName', 'localhost'),)),
97 'subjectAltName': (('DNS', 'localhost'),),
98 'version': 3
99}
100
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100101SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200102SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100103SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
104SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
105
Martin Panter3840b2a2016-03-27 01:53:46 +0000106# Same certificate as pycacert.pem, but without extra text in file
107SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200108# cert with all kinds of subject alt names
109ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100110IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100111
Martin Panter3d81d932016-01-14 09:36:00 +0000112REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000113
114EMPTYCERT = data_file("nullcert.pem")
115BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000116NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000117BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200118NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200119NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400121DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
185def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
187 @functools.wraps(func)
188 def f(*args, **kwargs):
189 try:
190 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
191 except ssl.SSLError:
192 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
193 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
194 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
195 return func(*args, **kwargs)
196 return f
197 else:
198 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100200needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
201
Antoine Pitrou23df4832010-08-04 17:14:06 +0000202
Christian Heimesd0486372016-09-10 23:23:33 +0200203def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
204 cert_reqs=ssl.CERT_NONE, ca_certs=None,
205 ciphers=None, certfile=None, keyfile=None,
206 **kwargs):
207 context = ssl.SSLContext(ssl_version)
208 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200209 if cert_reqs == ssl.CERT_NONE:
210 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200211 context.verify_mode = cert_reqs
212 if ca_certs is not None:
213 context.load_verify_locations(ca_certs)
214 if certfile is not None or keyfile is not None:
215 context.load_cert_chain(certfile, keyfile)
216 if ciphers is not None:
217 context.set_ciphers(ciphers)
218 return context.wrap_socket(sock, **kwargs)
219
Christian Heimesa170fa12017-09-15 20:27:30 +0200220
221def testing_context(server_cert=SIGNED_CERTFILE):
222 """Create context
223
224 client_context, server_context, hostname = testing_context()
225 """
226 if server_cert == SIGNED_CERTFILE:
227 hostname = SIGNED_CERTFILE_HOSTNAME
228 elif server_cert == SIGNED_CERTFILE2:
229 hostname = SIGNED_CERTFILE2_HOSTNAME
230 else:
231 raise ValueError(server_cert)
232
233 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
234 client_context.load_verify_locations(SIGNING_CA)
235
236 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
237 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800238 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200239
240 return client_context, server_context, hostname
241
242
Antoine Pitrou152efa22010-05-16 18:19:27 +0000243class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000244
Antoine Pitrou480a1242010-04-28 21:37:09 +0000245 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000246 ssl.CERT_NONE
247 ssl.CERT_OPTIONAL
248 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100249 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100250 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100251 if ssl.HAS_ECDH:
252 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100253 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
254 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000255 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100256 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700257 ssl.OP_NO_SSLv2
258 ssl.OP_NO_SSLv3
259 ssl.OP_NO_TLSv1
260 ssl.OP_NO_TLSv1_3
261 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
262 ssl.OP_NO_TLSv1_1
263 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200264 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000265
Christian Heimes89c20512018-02-27 11:17:32 +0100266 def test_private_init(self):
267 with self.assertRaisesRegex(TypeError, "public constructor"):
268 with socket.socket() as s:
269 ssl.SSLSocket(s)
270
Antoine Pitrou172f0252014-04-18 20:33:08 +0200271 def test_str_for_enums(self):
272 # Make sure that the PROTOCOL_* constants have enum-like string
273 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200274 proto = ssl.PROTOCOL_TLS
275 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200276 ctx = ssl.SSLContext(proto)
277 self.assertIs(ctx.protocol, proto)
278
Antoine Pitrou480a1242010-04-28 21:37:09 +0000279 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000282 sys.stdout.write("\n RAND_status is %d (%s)\n"
283 % (v, (v and "sufficient randomness") or
284 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200285
286 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
287 self.assertEqual(len(data), 16)
288 self.assertEqual(is_cryptographic, v == 1)
289 if v:
290 data = ssl.RAND_bytes(16)
291 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200292 else:
293 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200294
Victor Stinner1e81a392013-12-19 16:47:04 +0100295 # negative num is invalid
296 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
297 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
298
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100299 if hasattr(ssl, 'RAND_egd'):
300 self.assertRaises(TypeError, ssl.RAND_egd, 1)
301 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000302 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200303 ssl.RAND_add(b"this is a random bytes object", 75.0)
304 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000305
Christian Heimesf77b4b22013-08-21 13:26:05 +0200306 @unittest.skipUnless(os.name == 'posix', 'requires posix')
307 def test_random_fork(self):
308 status = ssl.RAND_status()
309 if not status:
310 self.fail("OpenSSL's PRNG has insufficient randomness")
311
312 rfd, wfd = os.pipe()
313 pid = os.fork()
314 if pid == 0:
315 try:
316 os.close(rfd)
317 child_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(child_random), 16)
319 os.write(wfd, child_random)
320 os.close(wfd)
321 except BaseException:
322 os._exit(1)
323 else:
324 os._exit(0)
325 else:
326 os.close(wfd)
327 self.addCleanup(os.close, rfd)
328 _, status = os.waitpid(pid, 0)
329 self.assertEqual(status, 0)
330
331 child_random = os.read(rfd, 16)
332 self.assertEqual(len(child_random), 16)
333 parent_random = ssl.RAND_pseudo_bytes(16)[0]
334 self.assertEqual(len(parent_random), 16)
335
336 self.assertNotEqual(child_random, parent_random)
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimes824f7f32013-08-17 00:54:47 +0200367 def test_parse_cert_CVE_2013_4238(self):
368 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 subject = ((('countryName', 'US'),),
372 (('stateOrProvinceName', 'Oregon'),),
373 (('localityName', 'Beaverton'),),
374 (('organizationName', 'Python Software Foundation'),),
375 (('organizationalUnitName', 'Python Core Development'),),
376 (('commonName', 'null.python.org\x00example.org'),),
377 (('emailAddress', 'python-dev@python.org'),))
378 self.assertEqual(p['subject'], subject)
379 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200380 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
381 san = (('DNS', 'altnull.python.org\x00example.com'),
382 ('email', 'null@python.org\x00user@example.org'),
383 ('URI', 'http://null.python.org\x00http://example.org'),
384 ('IP Address', '192.0.2.1'),
385 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
386 else:
387 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
388 san = (('DNS', 'altnull.python.org\x00example.com'),
389 ('email', 'null@python.org\x00user@example.org'),
390 ('URI', 'http://null.python.org\x00http://example.org'),
391 ('IP Address', '192.0.2.1'),
392 ('IP Address', '<invalid>'))
393
394 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200395
Christian Heimes1c03abd2016-09-06 23:25:35 +0200396 def test_parse_all_sans(self):
397 p = ssl._ssl._test_decode_cert(ALLSANFILE)
398 self.assertEqual(p['subjectAltName'],
399 (
400 ('DNS', 'allsans'),
401 ('othername', '<unsupported>'),
402 ('othername', '<unsupported>'),
403 ('email', 'user@example.org'),
404 ('DNS', 'www.example.org'),
405 ('DirName',
406 ((('countryName', 'XY'),),
407 (('localityName', 'Castle Anthrax'),),
408 (('organizationName', 'Python Software Foundation'),),
409 (('commonName', 'dirname example'),))),
410 ('URI', 'https://www.python.org/'),
411 ('IP Address', '127.0.0.1'),
412 ('IP Address', '0:0:0:0:0:0:0:1\n'),
413 ('Registered ID', '1.2.3.4.5')
414 )
415 )
416
Antoine Pitrou480a1242010-04-28 21:37:09 +0000417 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000418 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000419 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000420 d1 = ssl.PEM_cert_to_DER_cert(pem)
421 p2 = ssl.DER_cert_to_PEM_cert(d1)
422 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000423 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000424 if not p2.startswith(ssl.PEM_HEADER + '\n'):
425 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
426 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
427 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000428
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000429 def test_openssl_version(self):
430 n = ssl.OPENSSL_VERSION_NUMBER
431 t = ssl.OPENSSL_VERSION_INFO
432 s = ssl.OPENSSL_VERSION
433 self.assertIsInstance(n, int)
434 self.assertIsInstance(t, tuple)
435 self.assertIsInstance(s, str)
436 # Some sanity checks follow
437 # >= 0.9
438 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400439 # < 3.0
440 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000441 major, minor, fix, patch, status = t
442 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400443 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000444 self.assertGreaterEqual(minor, 0)
445 self.assertLess(minor, 256)
446 self.assertGreaterEqual(fix, 0)
447 self.assertLess(fix, 256)
448 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100449 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 self.assertGreaterEqual(status, 0)
451 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400452 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200453 if IS_LIBRESSL:
454 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100455 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400456 else:
457 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100458 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000459
Antoine Pitrou9d543662010-04-23 23:10:32 +0000460 @support.cpython_only
461 def test_refcycle(self):
462 # Issue #7943: an SSL object doesn't create reference cycles with
463 # itself.
464 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200465 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000466 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100467 with support.check_warnings(("", ResourceWarning)):
468 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100469 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000470
Antoine Pitroua468adc2010-09-14 14:43:44 +0000471 def test_wrapped_unconnected(self):
472 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200473 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000474 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200475 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100476 self.assertRaises(OSError, ss.recv, 1)
477 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
478 self.assertRaises(OSError, ss.recvfrom, 1)
479 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
480 self.assertRaises(OSError, ss.send, b'x')
481 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800482 self.assertRaises(NotImplementedError, ss.sendmsg,
483 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000484
Antoine Pitrou40f08742010-04-24 22:04:40 +0000485 def test_timeout(self):
486 # Issue #8524: when creating an SSL socket, the timeout of the
487 # original socket should be retained.
488 for timeout in (None, 0.0, 5.0):
489 s = socket.socket(socket.AF_INET)
490 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200491 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100492 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000493
Christian Heimesd0486372016-09-10 23:23:33 +0200494 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000495 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000496 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000497 "certfile must be specified",
498 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000499 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000500 "certfile must be specified for server-side operations",
501 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000502 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000503 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200504 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100505 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
506 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200507 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200508 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000509 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000510 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000511 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200512 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000513 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000514 ssl.wrap_socket(sock,
515 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000516 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200517 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000518 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000519 ssl.wrap_socket(sock,
520 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000521 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000522
Martin Panter3464ea22016-02-01 21:58:11 +0000523 def bad_cert_test(self, certfile):
524 """Check that trying to use the given client certificate fails"""
525 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
526 certfile)
527 sock = socket.socket()
528 self.addCleanup(sock.close)
529 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200530 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200531 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000532
533 def test_empty_cert(self):
534 """Wrapping with an empty cert file"""
535 self.bad_cert_test("nullcert.pem")
536
537 def test_malformed_cert(self):
538 """Wrapping with a badly formatted certificate (syntax error)"""
539 self.bad_cert_test("badcert.pem")
540
541 def test_malformed_key(self):
542 """Wrapping with a badly formatted key (syntax error)"""
543 self.bad_cert_test("badkey.pem")
544
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 def test_match_hostname(self):
546 def ok(cert, hostname):
547 ssl.match_hostname(cert, hostname)
548 def fail(cert, hostname):
549 self.assertRaises(ssl.CertificateError,
550 ssl.match_hostname, cert, hostname)
551
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100552 # -- Hostname matching --
553
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000554 cert = {'subject': ((('commonName', 'example.com'),),)}
555 ok(cert, 'example.com')
556 ok(cert, 'ExAmple.cOm')
557 fail(cert, 'www.example.com')
558 fail(cert, '.example.com')
559 fail(cert, 'example.org')
560 fail(cert, 'exampleXcom')
561
562 cert = {'subject': ((('commonName', '*.a.com'),),)}
563 ok(cert, 'foo.a.com')
564 fail(cert, 'bar.foo.a.com')
565 fail(cert, 'a.com')
566 fail(cert, 'Xa.com')
567 fail(cert, '.a.com')
568
Mandeep Singhede2ac92017-11-27 04:01:27 +0530569 # only match wildcards when they are the only thing
570 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000571 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530572 fail(cert, 'foo.com')
573 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000574 fail(cert, 'bar.com')
575 fail(cert, 'foo.a.com')
576 fail(cert, 'bar.foo.com')
577
Christian Heimes824f7f32013-08-17 00:54:47 +0200578 # NULL bytes are bad, CVE-2013-4073
579 cert = {'subject': ((('commonName',
580 'null.python.org\x00example.org'),),)}
581 ok(cert, 'null.python.org\x00example.org') # or raise an error?
582 fail(cert, 'example.org')
583 fail(cert, 'null.python.org')
584
Georg Brandl72c98d32013-10-27 07:16:53 +0100585 # error cases with wildcards
586 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
587 fail(cert, 'bar.foo.a.com')
588 fail(cert, 'a.com')
589 fail(cert, 'Xa.com')
590 fail(cert, '.a.com')
591
592 cert = {'subject': ((('commonName', 'a.*.com'),),)}
593 fail(cert, 'a.foo.com')
594 fail(cert, 'a..com')
595 fail(cert, 'a.com')
596
597 # wildcard doesn't match IDNA prefix 'xn--'
598 idna = 'püthon.python.org'.encode("idna").decode("ascii")
599 cert = {'subject': ((('commonName', idna),),)}
600 ok(cert, idna)
601 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
602 fail(cert, idna)
603 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
604 fail(cert, idna)
605
606 # wildcard in first fragment and IDNA A-labels in sequent fragments
607 # are supported.
608 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
609 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530610 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
611 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100612 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
613 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
614
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000615 # Slightly fake real-world example
616 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
617 'subject': ((('commonName', 'linuxfrz.org'),),),
618 'subjectAltName': (('DNS', 'linuxfr.org'),
619 ('DNS', 'linuxfr.com'),
620 ('othername', '<unsupported>'))}
621 ok(cert, 'linuxfr.org')
622 ok(cert, 'linuxfr.com')
623 # Not a "DNS" entry
624 fail(cert, '<unsupported>')
625 # When there is a subjectAltName, commonName isn't used
626 fail(cert, 'linuxfrz.org')
627
628 # A pristine real-world example
629 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
630 'subject': ((('countryName', 'US'),),
631 (('stateOrProvinceName', 'California'),),
632 (('localityName', 'Mountain View'),),
633 (('organizationName', 'Google Inc'),),
634 (('commonName', 'mail.google.com'),))}
635 ok(cert, 'mail.google.com')
636 fail(cert, 'gmail.com')
637 # Only commonName is considered
638 fail(cert, 'California')
639
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100640 # -- IPv4 matching --
641 cert = {'subject': ((('commonName', 'example.com'),),),
642 'subjectAltName': (('DNS', 'example.com'),
643 ('IP Address', '10.11.12.13'),
644 ('IP Address', '14.15.16.17'))}
645 ok(cert, '10.11.12.13')
646 ok(cert, '14.15.16.17')
647 fail(cert, '14.15.16.18')
648 fail(cert, 'example.net')
649
650 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800651 if hasattr(socket, 'AF_INET6'):
652 cert = {'subject': ((('commonName', 'example.com'),),),
653 'subjectAltName': (
654 ('DNS', 'example.com'),
655 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
656 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
657 ok(cert, '2001::cafe')
658 ok(cert, '2003::baba')
659 fail(cert, '2003::bebe')
660 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100661
662 # -- Miscellaneous --
663
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000664 # Neither commonName nor subjectAltName
665 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
666 'subject': ((('countryName', 'US'),),
667 (('stateOrProvinceName', 'California'),),
668 (('localityName', 'Mountain View'),),
669 (('organizationName', 'Google Inc'),))}
670 fail(cert, 'mail.google.com')
671
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200672 # No DNS entry in subjectAltName but a commonName
673 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
674 'subject': ((('countryName', 'US'),),
675 (('stateOrProvinceName', 'California'),),
676 (('localityName', 'Mountain View'),),
677 (('commonName', 'mail.google.com'),)),
678 'subjectAltName': (('othername', 'blabla'), )}
679 ok(cert, 'mail.google.com')
680
681 # No DNS entry subjectAltName and no commonName
682 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
683 'subject': ((('countryName', 'US'),),
684 (('stateOrProvinceName', 'California'),),
685 (('localityName', 'Mountain View'),),
686 (('organizationName', 'Google Inc'),)),
687 'subjectAltName': (('othername', 'blabla'),)}
688 fail(cert, 'google.com')
689
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000690 # Empty cert / no cert
691 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
692 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
693
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200694 # Issue #17980: avoid denials of service by refusing more than one
695 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800696 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
697 with self.assertRaisesRegex(
698 ssl.CertificateError,
699 "partial wildcards in leftmost label are not supported"):
700 ssl.match_hostname(cert, 'axxb.example.com')
701
702 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
703 with self.assertRaisesRegex(
704 ssl.CertificateError,
705 "wildcard can only be present in the leftmost label"):
706 ssl.match_hostname(cert, 'www.sub.example.com')
707
708 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
709 with self.assertRaisesRegex(
710 ssl.CertificateError,
711 "too many wildcards"):
712 ssl.match_hostname(cert, 'axxbxxc.example.com')
713
714 cert = {'subject': ((('commonName', '*'),),)}
715 with self.assertRaisesRegex(
716 ssl.CertificateError,
717 "sole wildcard without additional labels are not support"):
718 ssl.match_hostname(cert, 'host')
719
720 cert = {'subject': ((('commonName', '*.com'),),)}
721 with self.assertRaisesRegex(
722 ssl.CertificateError,
723 r"hostname 'com' doesn't match '\*.com'"):
724 ssl.match_hostname(cert, 'com')
725
726 # extra checks for _inet_paton()
727 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
728 with self.assertRaises(ValueError):
729 ssl._inet_paton(invalid)
730 for ipaddr in ['127.0.0.1', '192.168.0.1']:
731 self.assertTrue(ssl._inet_paton(ipaddr))
732 if hasattr(socket, 'AF_INET6'):
733 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
734 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200735
Antoine Pitroud5323212010-10-22 18:19:07 +0000736 def test_server_side(self):
737 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200738 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000739 with socket.socket() as sock:
740 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
741 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000742
Antoine Pitroud6494802011-07-21 01:11:30 +0200743 def test_unknown_channel_binding(self):
744 # should raise ValueError for unknown type
745 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200746 s.bind(('127.0.0.1', 0))
747 s.listen()
748 c = socket.socket(socket.AF_INET)
749 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200750 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100751 with self.assertRaises(ValueError):
752 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200753 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200754
755 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
756 "'tls-unique' channel binding not available")
757 def test_tls_unique_channel_binding(self):
758 # unconnected should return None for known type
759 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200760 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100761 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200762 # the same for server-side
763 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200764 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100765 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200766
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600767 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200768 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600769 r = repr(ss)
770 with self.assertWarns(ResourceWarning) as cm:
771 ss = None
772 support.gc_collect()
773 self.assertIn(r, str(cm.warning.args[0]))
774
Christian Heimes6d7ad132013-06-09 18:02:55 +0200775 def test_get_default_verify_paths(self):
776 paths = ssl.get_default_verify_paths()
777 self.assertEqual(len(paths), 6)
778 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
779
780 with support.EnvironmentVarGuard() as env:
781 env["SSL_CERT_DIR"] = CAPATH
782 env["SSL_CERT_FILE"] = CERTFILE
783 paths = ssl.get_default_verify_paths()
784 self.assertEqual(paths.cafile, CERTFILE)
785 self.assertEqual(paths.capath, CAPATH)
786
Christian Heimes44109d72013-11-22 01:51:30 +0100787 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
788 def test_enum_certificates(self):
789 self.assertTrue(ssl.enum_certificates("CA"))
790 self.assertTrue(ssl.enum_certificates("ROOT"))
791
792 self.assertRaises(TypeError, ssl.enum_certificates)
793 self.assertRaises(WindowsError, ssl.enum_certificates, "")
794
Christian Heimesc2d65e12013-11-22 16:13:55 +0100795 trust_oids = set()
796 for storename in ("CA", "ROOT"):
797 store = ssl.enum_certificates(storename)
798 self.assertIsInstance(store, list)
799 for element in store:
800 self.assertIsInstance(element, tuple)
801 self.assertEqual(len(element), 3)
802 cert, enc, trust = element
803 self.assertIsInstance(cert, bytes)
804 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
805 self.assertIsInstance(trust, (set, bool))
806 if isinstance(trust, set):
807 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100808
809 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100810 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200811
Christian Heimes46bebee2013-06-09 19:03:31 +0200812 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100813 def test_enum_crls(self):
814 self.assertTrue(ssl.enum_crls("CA"))
815 self.assertRaises(TypeError, ssl.enum_crls)
816 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200817
Christian Heimes44109d72013-11-22 01:51:30 +0100818 crls = ssl.enum_crls("CA")
819 self.assertIsInstance(crls, list)
820 for element in crls:
821 self.assertIsInstance(element, tuple)
822 self.assertEqual(len(element), 2)
823 self.assertIsInstance(element[0], bytes)
824 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200825
Christian Heimes46bebee2013-06-09 19:03:31 +0200826
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100827 def test_asn1object(self):
828 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
829 '1.3.6.1.5.5.7.3.1')
830
831 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
832 self.assertEqual(val, expected)
833 self.assertEqual(val.nid, 129)
834 self.assertEqual(val.shortname, 'serverAuth')
835 self.assertEqual(val.longname, 'TLS Web Server Authentication')
836 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
837 self.assertIsInstance(val, ssl._ASN1Object)
838 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
839
840 val = ssl._ASN1Object.fromnid(129)
841 self.assertEqual(val, expected)
842 self.assertIsInstance(val, ssl._ASN1Object)
843 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100844 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
845 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100846 for i in range(1000):
847 try:
848 obj = ssl._ASN1Object.fromnid(i)
849 except ValueError:
850 pass
851 else:
852 self.assertIsInstance(obj.nid, int)
853 self.assertIsInstance(obj.shortname, str)
854 self.assertIsInstance(obj.longname, str)
855 self.assertIsInstance(obj.oid, (str, type(None)))
856
857 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
858 self.assertEqual(val, expected)
859 self.assertIsInstance(val, ssl._ASN1Object)
860 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
861 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
862 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100863 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
864 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100865
Christian Heimes72d28502013-11-23 13:56:58 +0100866 def test_purpose_enum(self):
867 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
868 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
869 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
872 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
873 '1.3.6.1.5.5.7.3.1')
874
875 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
876 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
877 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
880 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
881 '1.3.6.1.5.5.7.3.2')
882
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100883 def test_unsupported_dtls(self):
884 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
885 self.addCleanup(s.close)
886 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200887 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100888 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100890 with self.assertRaises(NotImplementedError) as cx:
891 ctx.wrap_socket(s)
892 self.assertEqual(str(cx.exception), "only stream sockets are supported")
893
Antoine Pitrouc695c952014-04-28 20:57:36 +0200894 def cert_time_ok(self, timestring, timestamp):
895 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
896
897 def cert_time_fail(self, timestring):
898 with self.assertRaises(ValueError):
899 ssl.cert_time_to_seconds(timestring)
900
901 @unittest.skipUnless(utc_offset(),
902 'local time needs to be different from UTC')
903 def test_cert_time_to_seconds_timezone(self):
904 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
905 # results if local timezone is not UTC
906 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
907 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
908
909 def test_cert_time_to_seconds(self):
910 timestring = "Jan 5 09:34:43 2018 GMT"
911 ts = 1515144883.0
912 self.cert_time_ok(timestring, ts)
913 # accept keyword parameter, assert its name
914 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
915 # accept both %e and %d (space or zero generated by strftime)
916 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
917 # case-insensitive
918 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
919 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
920 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
921 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
922 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
923 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
924 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
925 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
926
927 newyear_ts = 1230768000.0
928 # leap seconds
929 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
930 # same timestamp
931 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
932
933 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
934 # allow 60th second (even if it is not a leap second)
935 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
936 # allow 2nd leap second for compatibility with time.strptime()
937 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
938 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
939
Mike53f7a7c2017-12-14 14:04:53 +0300940 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200941 # 99991231235959Z (rfc 5280)
942 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
943
944 @support.run_with_locale('LC_ALL', '')
945 def test_cert_time_to_seconds_locale(self):
946 # `cert_time_to_seconds()` should be locale independent
947
948 def local_february_name():
949 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
950
951 if local_february_name().lower() == 'feb':
952 self.skipTest("locale-specific month name needs to be "
953 "different from C locale")
954
955 # locale-independent
956 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
957 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
958
Martin Panter3840b2a2016-03-27 01:53:46 +0000959 def test_connect_ex_error(self):
960 server = socket.socket(socket.AF_INET)
961 self.addCleanup(server.close)
962 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200963 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000964 cert_reqs=ssl.CERT_REQUIRED)
965 self.addCleanup(s.close)
966 rc = s.connect_ex((HOST, port))
967 # Issue #19919: Windows machines or VMs hosted on Windows
968 # machines sometimes return EWOULDBLOCK.
969 errors = (
970 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
971 errno.EWOULDBLOCK,
972 )
973 self.assertIn(rc, errors)
974
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100975
Antoine Pitrou152efa22010-05-16 18:19:27 +0000976class ContextTests(unittest.TestCase):
977
Antoine Pitrou23df4832010-08-04 17:14:06 +0000978 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000979 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100980 for protocol in PROTOCOLS:
981 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200982 ctx = ssl.SSLContext()
983 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000984 self.assertRaises(ValueError, ssl.SSLContext, -1)
985 self.assertRaises(ValueError, ssl.SSLContext, 42)
986
Antoine Pitrou23df4832010-08-04 17:14:06 +0000987 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000988 def test_protocol(self):
989 for proto in PROTOCOLS:
990 ctx = ssl.SSLContext(proto)
991 self.assertEqual(ctx.protocol, proto)
992
993 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000995 ctx.set_ciphers("ALL")
996 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000997 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000998 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000999
Christian Heimes892d66e2018-01-29 14:10:18 +01001000 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1001 "Test applies only to Python default ciphers")
1002 def test_python_ciphers(self):
1003 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1004 ciphers = ctx.get_ciphers()
1005 for suite in ciphers:
1006 name = suite['name']
1007 self.assertNotIn("PSK", name)
1008 self.assertNotIn("SRP", name)
1009 self.assertNotIn("MD5", name)
1010 self.assertNotIn("RC4", name)
1011 self.assertNotIn("3DES", name)
1012
Christian Heimes25bfcd52016-09-06 00:04:45 +02001013 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1014 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001016 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001017 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001018 self.assertIn('AES256-GCM-SHA384', names)
1019 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001020
Antoine Pitrou23df4832010-08-04 17:14:06 +00001021 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001022 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001024 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001025 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001026 # SSLContext also enables these by default
1027 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001028 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1029 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001030 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001031 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001032 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001033 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001034 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1035 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001036 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001037 # Ubuntu has OP_NO_SSLv3 forced on by default
1038 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001039 else:
1040 with self.assertRaises(ValueError):
1041 ctx.options = 0
1042
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 def test_verify_mode_protocol(self):
1044 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001045 # Default value
1046 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1047 ctx.verify_mode = ssl.CERT_OPTIONAL
1048 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1049 ctx.verify_mode = ssl.CERT_REQUIRED
1050 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1051 ctx.verify_mode = ssl.CERT_NONE
1052 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1053 with self.assertRaises(TypeError):
1054 ctx.verify_mode = None
1055 with self.assertRaises(ValueError):
1056 ctx.verify_mode = 42
1057
Christian Heimesa170fa12017-09-15 20:27:30 +02001058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1059 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1060 self.assertFalse(ctx.check_hostname)
1061
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1063 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1064 self.assertTrue(ctx.check_hostname)
1065
Christian Heimes61d478c2018-01-27 15:51:38 +01001066 def test_hostname_checks_common_name(self):
1067 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1068 self.assertTrue(ctx.hostname_checks_common_name)
1069 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1070 ctx.hostname_checks_common_name = True
1071 self.assertTrue(ctx.hostname_checks_common_name)
1072 ctx.hostname_checks_common_name = False
1073 self.assertFalse(ctx.hostname_checks_common_name)
1074 ctx.hostname_checks_common_name = True
1075 self.assertTrue(ctx.hostname_checks_common_name)
1076 else:
1077 with self.assertRaises(AttributeError):
1078 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001079
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001080 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1081 "required OpenSSL 1.1.0g")
1082 def test_min_max_version(self):
1083 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1084 self.assertEqual(
1085 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1086 )
1087 self.assertEqual(
1088 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1089 )
1090
1091 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1092 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1093 self.assertEqual(
1094 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1095 )
1096 self.assertEqual(
1097 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1098 )
1099
1100 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1101 ctx.maximum_version = ssl.TLSVersion.TLSv1
1102 self.assertEqual(
1103 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1104 )
1105 self.assertEqual(
1106 ctx.maximum_version, ssl.TLSVersion.TLSv1
1107 )
1108
1109 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1110 self.assertEqual(
1111 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1112 )
1113
1114 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1115 self.assertIn(
1116 ctx.maximum_version,
1117 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1118 )
1119
1120 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1121 self.assertIn(
1122 ctx.minimum_version,
1123 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1124 )
1125
1126 with self.assertRaises(ValueError):
1127 ctx.minimum_version = 42
1128
1129 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1130
1131 self.assertEqual(
1132 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1133 )
1134 self.assertEqual(
1135 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1136 )
1137 with self.assertRaises(ValueError):
1138 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1139 with self.assertRaises(ValueError):
1140 ctx.maximum_version = ssl.TLSVersion.TLSv1
1141
1142
Christian Heimes2427b502013-11-23 11:24:32 +01001143 @unittest.skipUnless(have_verify_flags(),
1144 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001145 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001146 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001147 # default value
1148 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1149 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001150 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1151 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1152 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1153 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1154 ctx.verify_flags = ssl.VERIFY_DEFAULT
1155 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1156 # supports any value
1157 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1158 self.assertEqual(ctx.verify_flags,
1159 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1160 with self.assertRaises(TypeError):
1161 ctx.verify_flags = None
1162
Antoine Pitrou152efa22010-05-16 18:19:27 +00001163 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001165 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001166 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001167 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1168 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001170 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001171 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001172 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001173 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001174 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001175 ctx.load_cert_chain(EMPTYCERT)
1176 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001177 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001178 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1179 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1180 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001181 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001182 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001183 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001184 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001185 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001186 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1187 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001188 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001189 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001190 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001191 # Password protected key and cert
1192 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1193 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1194 ctx.load_cert_chain(CERTFILE_PROTECTED,
1195 password=bytearray(KEY_PASSWORD.encode()))
1196 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1197 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1198 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1199 bytearray(KEY_PASSWORD.encode()))
1200 with self.assertRaisesRegex(TypeError, "should be a string"):
1201 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1202 with self.assertRaises(ssl.SSLError):
1203 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1204 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1205 # openssl has a fixed limit on the password buffer.
1206 # PEM_BUFSIZE is generally set to 1kb.
1207 # Return a string larger than this.
1208 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1209 # Password callback
1210 def getpass_unicode():
1211 return KEY_PASSWORD
1212 def getpass_bytes():
1213 return KEY_PASSWORD.encode()
1214 def getpass_bytearray():
1215 return bytearray(KEY_PASSWORD.encode())
1216 def getpass_badpass():
1217 return "badpass"
1218 def getpass_huge():
1219 return b'a' * (1024 * 1024)
1220 def getpass_bad_type():
1221 return 9
1222 def getpass_exception():
1223 raise Exception('getpass error')
1224 class GetPassCallable:
1225 def __call__(self):
1226 return KEY_PASSWORD
1227 def getpass(self):
1228 return KEY_PASSWORD
1229 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1232 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1233 ctx.load_cert_chain(CERTFILE_PROTECTED,
1234 password=GetPassCallable().getpass)
1235 with self.assertRaises(ssl.SSLError):
1236 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1237 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1238 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1239 with self.assertRaisesRegex(TypeError, "must return a string"):
1240 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1241 with self.assertRaisesRegex(Exception, "getpass error"):
1242 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1243 # Make sure the password function isn't called if it isn't needed
1244 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001245
1246 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001247 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001248 ctx.load_verify_locations(CERTFILE)
1249 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1250 ctx.load_verify_locations(BYTES_CERTFILE)
1251 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1252 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001253 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001254 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001255 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001256 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001257 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001258 ctx.load_verify_locations(BADCERT)
1259 ctx.load_verify_locations(CERTFILE, CAPATH)
1260 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1261
Victor Stinner80f75e62011-01-29 11:31:20 +00001262 # Issue #10989: crash if the second argument type is invalid
1263 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1264
Christian Heimesefff7062013-11-21 03:35:02 +01001265 def test_load_verify_cadata(self):
1266 # test cadata
1267 with open(CAFILE_CACERT) as f:
1268 cacert_pem = f.read()
1269 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1270 with open(CAFILE_NEURONIO) as f:
1271 neuronio_pem = f.read()
1272 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1273
1274 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001275 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001276 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1277 ctx.load_verify_locations(cadata=cacert_pem)
1278 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1279 ctx.load_verify_locations(cadata=neuronio_pem)
1280 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1281 # cert already in hash table
1282 ctx.load_verify_locations(cadata=neuronio_pem)
1283 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1284
1285 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001286 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001287 combined = "\n".join((cacert_pem, neuronio_pem))
1288 ctx.load_verify_locations(cadata=combined)
1289 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1290
1291 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001292 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001293 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1294 neuronio_pem, "tail"]
1295 ctx.load_verify_locations(cadata="\n".join(combined))
1296 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1297
1298 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001299 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001300 ctx.load_verify_locations(cadata=cacert_der)
1301 ctx.load_verify_locations(cadata=neuronio_der)
1302 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1303 # cert already in hash table
1304 ctx.load_verify_locations(cadata=cacert_der)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1306
1307 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001308 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001309 combined = b"".join((cacert_der, neuronio_der))
1310 ctx.load_verify_locations(cadata=combined)
1311 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1312
1313 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001314 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001315 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1316
1317 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1318 ctx.load_verify_locations(cadata="broken")
1319 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1320 ctx.load_verify_locations(cadata=b"broken")
1321
1322
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001323 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001324 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001325 ctx.load_dh_params(DHFILE)
1326 if os.name != 'nt':
1327 ctx.load_dh_params(BYTES_DHFILE)
1328 self.assertRaises(TypeError, ctx.load_dh_params)
1329 self.assertRaises(TypeError, ctx.load_dh_params, None)
1330 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001331 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001332 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001333 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001334 ctx.load_dh_params(CERTFILE)
1335
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001336 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001337 def test_session_stats(self):
1338 for proto in PROTOCOLS:
1339 ctx = ssl.SSLContext(proto)
1340 self.assertEqual(ctx.session_stats(), {
1341 'number': 0,
1342 'connect': 0,
1343 'connect_good': 0,
1344 'connect_renegotiate': 0,
1345 'accept': 0,
1346 'accept_good': 0,
1347 'accept_renegotiate': 0,
1348 'hits': 0,
1349 'misses': 0,
1350 'timeouts': 0,
1351 'cache_full': 0,
1352 })
1353
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001354 def test_set_default_verify_paths(self):
1355 # There's not much we can do to test that it acts as expected,
1356 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001357 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001358 ctx.set_default_verify_paths()
1359
Antoine Pitrou501da612011-12-21 09:27:41 +01001360 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001361 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001362 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001363 ctx.set_ecdh_curve("prime256v1")
1364 ctx.set_ecdh_curve(b"prime256v1")
1365 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1366 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1367 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1368 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1369
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001370 @needs_sni
1371 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001372 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001373
1374 # set_servername_callback expects a callable, or None
1375 self.assertRaises(TypeError, ctx.set_servername_callback)
1376 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1377 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1378 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1379
1380 def dummycallback(sock, servername, ctx):
1381 pass
1382 ctx.set_servername_callback(None)
1383 ctx.set_servername_callback(dummycallback)
1384
1385 @needs_sni
1386 def test_sni_callback_refcycle(self):
1387 # Reference cycles through the servername callback are detected
1388 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001389 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001390 def dummycallback(sock, servername, ctx, cycle=ctx):
1391 pass
1392 ctx.set_servername_callback(dummycallback)
1393 wr = weakref.ref(ctx)
1394 del ctx, dummycallback
1395 gc.collect()
1396 self.assertIs(wr(), None)
1397
Christian Heimes9a5395a2013-06-17 15:44:12 +02001398 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001399 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001400 self.assertEqual(ctx.cert_store_stats(),
1401 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1402 ctx.load_cert_chain(CERTFILE)
1403 self.assertEqual(ctx.cert_store_stats(),
1404 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1405 ctx.load_verify_locations(CERTFILE)
1406 self.assertEqual(ctx.cert_store_stats(),
1407 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001408 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001409 self.assertEqual(ctx.cert_store_stats(),
1410 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1411
1412 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001413 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001414 self.assertEqual(ctx.get_ca_certs(), [])
1415 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1416 ctx.load_verify_locations(CERTFILE)
1417 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001418 # but CAFILE_CACERT is a CA cert
1419 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001420 self.assertEqual(ctx.get_ca_certs(),
1421 [{'issuer': ((('organizationName', 'Root CA'),),
1422 (('organizationalUnitName', 'http://www.cacert.org'),),
1423 (('commonName', 'CA Cert Signing Authority'),),
1424 (('emailAddress', 'support@cacert.org'),)),
1425 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1426 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1427 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001428 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001429 'subject': ((('organizationName', 'Root CA'),),
1430 (('organizationalUnitName', 'http://www.cacert.org'),),
1431 (('commonName', 'CA Cert Signing Authority'),),
1432 (('emailAddress', 'support@cacert.org'),)),
1433 'version': 3}])
1434
Martin Panterb55f8b72016-01-14 12:53:56 +00001435 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001436 pem = f.read()
1437 der = ssl.PEM_cert_to_DER_cert(pem)
1438 self.assertEqual(ctx.get_ca_certs(True), [der])
1439
Christian Heimes72d28502013-11-23 13:56:58 +01001440 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001441 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001442 ctx.load_default_certs()
1443
Christian Heimesa170fa12017-09-15 20:27:30 +02001444 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001445 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1446 ctx.load_default_certs()
1447
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001449 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1450
Christian Heimesa170fa12017-09-15 20:27:30 +02001451 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001452 self.assertRaises(TypeError, ctx.load_default_certs, None)
1453 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1454
Benjamin Peterson91244e02014-10-03 18:17:15 -04001455 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001456 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001457 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001458 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001459 with support.EnvironmentVarGuard() as env:
1460 env["SSL_CERT_DIR"] = CAPATH
1461 env["SSL_CERT_FILE"] = CERTFILE
1462 ctx.load_default_certs()
1463 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1464
Benjamin Peterson91244e02014-10-03 18:17:15 -04001465 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001466 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001467 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001469 ctx.load_default_certs()
1470 stats = ctx.cert_store_stats()
1471
Christian Heimesa170fa12017-09-15 20:27:30 +02001472 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001473 with support.EnvironmentVarGuard() as env:
1474 env["SSL_CERT_DIR"] = CAPATH
1475 env["SSL_CERT_FILE"] = CERTFILE
1476 ctx.load_default_certs()
1477 stats["x509"] += 1
1478 self.assertEqual(ctx.cert_store_stats(), stats)
1479
Christian Heimes358cfd42016-09-10 22:43:48 +02001480 def _assert_context_options(self, ctx):
1481 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1482 if OP_NO_COMPRESSION != 0:
1483 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1484 OP_NO_COMPRESSION)
1485 if OP_SINGLE_DH_USE != 0:
1486 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1487 OP_SINGLE_DH_USE)
1488 if OP_SINGLE_ECDH_USE != 0:
1489 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1490 OP_SINGLE_ECDH_USE)
1491 if OP_CIPHER_SERVER_PREFERENCE != 0:
1492 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1493 OP_CIPHER_SERVER_PREFERENCE)
1494
Christian Heimes4c05b472013-11-23 15:58:30 +01001495 def test_create_default_context(self):
1496 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001497
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001499 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001500 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001501 self._assert_context_options(ctx)
1502
Christian Heimes4c05b472013-11-23 15:58:30 +01001503 with open(SIGNING_CA) as f:
1504 cadata = f.read()
1505 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1506 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001507 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001508 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001509 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001510
1511 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001512 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001513 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001515
Christian Heimes67986f92013-11-23 22:43:47 +01001516 def test__create_stdlib_context(self):
1517 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001518 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001519 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001520 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001521 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001522
1523 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1524 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1525 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001526 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001527
1528 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001529 cert_reqs=ssl.CERT_REQUIRED,
1530 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001531 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1532 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001533 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001534 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001535
1536 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001537 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001538 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001539 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001540
Christian Heimes1aa9a752013-12-02 02:41:19 +01001541 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001543 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001544 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001545
Christian Heimese82c0342017-09-15 20:29:57 +02001546 # Auto set CERT_REQUIRED
1547 ctx.check_hostname = True
1548 self.assertTrue(ctx.check_hostname)
1549 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1550 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001551 ctx.verify_mode = ssl.CERT_REQUIRED
1552 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001553 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001554
Christian Heimese82c0342017-09-15 20:29:57 +02001555 # Changing verify_mode does not affect check_hostname
1556 ctx.check_hostname = False
1557 ctx.verify_mode = ssl.CERT_NONE
1558 ctx.check_hostname = False
1559 self.assertFalse(ctx.check_hostname)
1560 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1561 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001562 ctx.check_hostname = True
1563 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001564 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1565
1566 ctx.check_hostname = False
1567 ctx.verify_mode = ssl.CERT_OPTIONAL
1568 ctx.check_hostname = False
1569 self.assertFalse(ctx.check_hostname)
1570 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1571 # keep CERT_OPTIONAL
1572 ctx.check_hostname = True
1573 self.assertTrue(ctx.check_hostname)
1574 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575
1576 # Cannot set CERT_NONE with check_hostname enabled
1577 with self.assertRaises(ValueError):
1578 ctx.verify_mode = ssl.CERT_NONE
1579 ctx.check_hostname = False
1580 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001581 ctx.verify_mode = ssl.CERT_NONE
1582 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001583
Christian Heimes5fe668c2016-09-12 00:01:11 +02001584 def test_context_client_server(self):
1585 # PROTOCOL_TLS_CLIENT has sane defaults
1586 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1587 self.assertTrue(ctx.check_hostname)
1588 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1589
1590 # PROTOCOL_TLS_SERVER has different but also sane defaults
1591 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1592 self.assertFalse(ctx.check_hostname)
1593 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1594
Christian Heimes4df60f12017-09-15 20:26:05 +02001595 def test_context_custom_class(self):
1596 class MySSLSocket(ssl.SSLSocket):
1597 pass
1598
1599 class MySSLObject(ssl.SSLObject):
1600 pass
1601
1602 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1603 ctx.sslsocket_class = MySSLSocket
1604 ctx.sslobject_class = MySSLObject
1605
1606 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1607 self.assertIsInstance(sock, MySSLSocket)
1608 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1609 self.assertIsInstance(obj, MySSLObject)
1610
Antoine Pitrou152efa22010-05-16 18:19:27 +00001611
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001612class SSLErrorTests(unittest.TestCase):
1613
1614 def test_str(self):
1615 # The str() of a SSLError doesn't include the errno
1616 e = ssl.SSLError(1, "foo")
1617 self.assertEqual(str(e), "foo")
1618 self.assertEqual(e.errno, 1)
1619 # Same for a subclass
1620 e = ssl.SSLZeroReturnError(1, "foo")
1621 self.assertEqual(str(e), "foo")
1622 self.assertEqual(e.errno, 1)
1623
1624 def test_lib_reason(self):
1625 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001626 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001627 with self.assertRaises(ssl.SSLError) as cm:
1628 ctx.load_dh_params(CERTFILE)
1629 self.assertEqual(cm.exception.library, 'PEM')
1630 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1631 s = str(cm.exception)
1632 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1633
1634 def test_subclass(self):
1635 # Check that the appropriate SSLError subclass is raised
1636 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001637 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1638 ctx.check_hostname = False
1639 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001640 with socket.socket() as s:
1641 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001642 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001643 c = socket.socket()
1644 c.connect(s.getsockname())
1645 c.setblocking(False)
1646 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001647 with self.assertRaises(ssl.SSLWantReadError) as cm:
1648 c.do_handshake()
1649 s = str(cm.exception)
1650 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1651 # For compatibility
1652 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1653
1654
Christian Heimes61d478c2018-01-27 15:51:38 +01001655 def test_bad_server_hostname(self):
1656 ctx = ssl.create_default_context()
1657 with self.assertRaises(ValueError):
1658 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1659 server_hostname="")
1660 with self.assertRaises(ValueError):
1661 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1662 server_hostname=".example.org")
1663
1664
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001665class MemoryBIOTests(unittest.TestCase):
1666
1667 def test_read_write(self):
1668 bio = ssl.MemoryBIO()
1669 bio.write(b'foo')
1670 self.assertEqual(bio.read(), b'foo')
1671 self.assertEqual(bio.read(), b'')
1672 bio.write(b'foo')
1673 bio.write(b'bar')
1674 self.assertEqual(bio.read(), b'foobar')
1675 self.assertEqual(bio.read(), b'')
1676 bio.write(b'baz')
1677 self.assertEqual(bio.read(2), b'ba')
1678 self.assertEqual(bio.read(1), b'z')
1679 self.assertEqual(bio.read(1), b'')
1680
1681 def test_eof(self):
1682 bio = ssl.MemoryBIO()
1683 self.assertFalse(bio.eof)
1684 self.assertEqual(bio.read(), b'')
1685 self.assertFalse(bio.eof)
1686 bio.write(b'foo')
1687 self.assertFalse(bio.eof)
1688 bio.write_eof()
1689 self.assertFalse(bio.eof)
1690 self.assertEqual(bio.read(2), b'fo')
1691 self.assertFalse(bio.eof)
1692 self.assertEqual(bio.read(1), b'o')
1693 self.assertTrue(bio.eof)
1694 self.assertEqual(bio.read(), b'')
1695 self.assertTrue(bio.eof)
1696
1697 def test_pending(self):
1698 bio = ssl.MemoryBIO()
1699 self.assertEqual(bio.pending, 0)
1700 bio.write(b'foo')
1701 self.assertEqual(bio.pending, 3)
1702 for i in range(3):
1703 bio.read(1)
1704 self.assertEqual(bio.pending, 3-i-1)
1705 for i in range(3):
1706 bio.write(b'x')
1707 self.assertEqual(bio.pending, i+1)
1708 bio.read()
1709 self.assertEqual(bio.pending, 0)
1710
1711 def test_buffer_types(self):
1712 bio = ssl.MemoryBIO()
1713 bio.write(b'foo')
1714 self.assertEqual(bio.read(), b'foo')
1715 bio.write(bytearray(b'bar'))
1716 self.assertEqual(bio.read(), b'bar')
1717 bio.write(memoryview(b'baz'))
1718 self.assertEqual(bio.read(), b'baz')
1719
1720 def test_error_types(self):
1721 bio = ssl.MemoryBIO()
1722 self.assertRaises(TypeError, bio.write, 'foo')
1723 self.assertRaises(TypeError, bio.write, None)
1724 self.assertRaises(TypeError, bio.write, True)
1725 self.assertRaises(TypeError, bio.write, 1)
1726
1727
Christian Heimes89c20512018-02-27 11:17:32 +01001728class SSLObjectTests(unittest.TestCase):
1729 def test_private_init(self):
1730 bio = ssl.MemoryBIO()
1731 with self.assertRaisesRegex(TypeError, "public constructor"):
1732 ssl.SSLObject(bio, bio)
1733
1734
Martin Panter3840b2a2016-03-27 01:53:46 +00001735class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001736 """Tests that connect to a simple server running in the background"""
1737
1738 def setUp(self):
1739 server = ThreadedEchoServer(SIGNED_CERTFILE)
1740 self.server_addr = (HOST, server.port)
1741 server.__enter__()
1742 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001743
Antoine Pitrou480a1242010-04-28 21:37:09 +00001744 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001745 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001746 cert_reqs=ssl.CERT_NONE) as s:
1747 s.connect(self.server_addr)
1748 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001749 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001750
Martin Panter3840b2a2016-03-27 01:53:46 +00001751 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001752 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001753 cert_reqs=ssl.CERT_REQUIRED,
1754 ca_certs=SIGNING_CA) as s:
1755 s.connect(self.server_addr)
1756 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001757 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001758
Martin Panter3840b2a2016-03-27 01:53:46 +00001759 def test_connect_fail(self):
1760 # This should fail because we have no verification certs. Connection
1761 # failure crashes ThreadedEchoServer, so run this in an independent
1762 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001763 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001764 cert_reqs=ssl.CERT_REQUIRED)
1765 self.addCleanup(s.close)
1766 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1767 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001768
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001769 def test_connect_ex(self):
1770 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001771 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001772 cert_reqs=ssl.CERT_REQUIRED,
1773 ca_certs=SIGNING_CA)
1774 self.addCleanup(s.close)
1775 self.assertEqual(0, s.connect_ex(self.server_addr))
1776 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001777
1778 def test_non_blocking_connect_ex(self):
1779 # Issue #11326: non-blocking connect_ex() should allow handshake
1780 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001781 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001782 cert_reqs=ssl.CERT_REQUIRED,
1783 ca_certs=SIGNING_CA,
1784 do_handshake_on_connect=False)
1785 self.addCleanup(s.close)
1786 s.setblocking(False)
1787 rc = s.connect_ex(self.server_addr)
1788 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1789 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1790 # Wait for connect to finish
1791 select.select([], [s], [], 5.0)
1792 # Non-blocking handshake
1793 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001794 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001795 s.do_handshake()
1796 break
1797 except ssl.SSLWantReadError:
1798 select.select([s], [], [], 5.0)
1799 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001800 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001801 # SSL established
1802 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001803
Antoine Pitrou152efa22010-05-16 18:19:27 +00001804 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001805 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001806 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001807 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1808 s.connect(self.server_addr)
1809 self.assertEqual({}, s.getpeercert())
1810 # Same with a server hostname
1811 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1812 server_hostname="dummy") as s:
1813 s.connect(self.server_addr)
1814 ctx.verify_mode = ssl.CERT_REQUIRED
1815 # This should succeed because we specify the root cert
1816 ctx.load_verify_locations(SIGNING_CA)
1817 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1818 s.connect(self.server_addr)
1819 cert = s.getpeercert()
1820 self.assertTrue(cert)
1821
1822 def test_connect_with_context_fail(self):
1823 # This should fail because we have no verification certs. Connection
1824 # failure crashes ThreadedEchoServer, so run this in an independent
1825 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001826 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001827 ctx.verify_mode = ssl.CERT_REQUIRED
1828 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1829 self.addCleanup(s.close)
1830 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1831 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001832
1833 def test_connect_capath(self):
1834 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001835 # NOTE: the subject hashing algorithm has been changed between
1836 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1837 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001838 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001839 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001840 ctx.verify_mode = ssl.CERT_REQUIRED
1841 ctx.load_verify_locations(capath=CAPATH)
1842 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1843 s.connect(self.server_addr)
1844 cert = s.getpeercert()
1845 self.assertTrue(cert)
1846 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001847 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001848 ctx.verify_mode = ssl.CERT_REQUIRED
1849 ctx.load_verify_locations(capath=BYTES_CAPATH)
1850 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1851 s.connect(self.server_addr)
1852 cert = s.getpeercert()
1853 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001854
Christian Heimesefff7062013-11-21 03:35:02 +01001855 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001856 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001857 pem = f.read()
1858 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001859 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001860 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001861 # TODO: fix TLSv1.3 support
1862 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001863 ctx.load_verify_locations(cadata=pem)
1864 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1865 s.connect(self.server_addr)
1866 cert = s.getpeercert()
1867 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001868
Martin Panter3840b2a2016-03-27 01:53:46 +00001869 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001870 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001871 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001872 # TODO: fix TLSv1.3 support
1873 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001874 ctx.load_verify_locations(cadata=der)
1875 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1876 s.connect(self.server_addr)
1877 cert = s.getpeercert()
1878 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001879
Antoine Pitroue3220242010-04-24 11:13:53 +00001880 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1881 def test_makefile_close(self):
1882 # Issue #5238: creating a file-like object with makefile() shouldn't
1883 # delay closing the underlying "real socket" (here tested with its
1884 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001885 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001886 ss.connect(self.server_addr)
1887 fd = ss.fileno()
1888 f = ss.makefile()
1889 f.close()
1890 # The fd is still open
1891 os.read(fd, 0)
1892 # Closing the SSL socket should close the fd too
1893 ss.close()
1894 gc.collect()
1895 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001896 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001897 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001898
Antoine Pitrou480a1242010-04-28 21:37:09 +00001899 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001900 s = socket.socket(socket.AF_INET)
1901 s.connect(self.server_addr)
1902 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001903 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001904 cert_reqs=ssl.CERT_NONE,
1905 do_handshake_on_connect=False)
1906 self.addCleanup(s.close)
1907 count = 0
1908 while True:
1909 try:
1910 count += 1
1911 s.do_handshake()
1912 break
1913 except ssl.SSLWantReadError:
1914 select.select([s], [], [])
1915 except ssl.SSLWantWriteError:
1916 select.select([], [s], [])
1917 if support.verbose:
1918 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001919
Antoine Pitrou480a1242010-04-28 21:37:09 +00001920 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001921 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001922
Martin Panter3840b2a2016-03-27 01:53:46 +00001923 def test_get_server_certificate_fail(self):
1924 # Connection failure crashes ThreadedEchoServer, so run this in an
1925 # independent test method
1926 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001927
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001928 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001929 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001930 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1931 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001932 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001933 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1934 s.connect(self.server_addr)
1935 # Error checking can happen at instantiation or when connecting
1936 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1937 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001938 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1940 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001941
Christian Heimes9a5395a2013-06-17 15:44:12 +02001942 def test_get_ca_certs_capath(self):
1943 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001944 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001945 ctx.load_verify_locations(capath=CAPATH)
1946 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001947 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1948 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001949 s.connect(self.server_addr)
1950 cert = s.getpeercert()
1951 self.assertTrue(cert)
1952 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001953
Christian Heimes575596e2013-12-15 21:49:17 +01001954 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001955 def test_context_setget(self):
1956 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001957 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1958 ctx1.load_verify_locations(capath=CAPATH)
1959 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1960 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001961 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001962 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001963 ss.connect(self.server_addr)
1964 self.assertIs(ss.context, ctx1)
1965 self.assertIs(ss._sslobj.context, ctx1)
1966 ss.context = ctx2
1967 self.assertIs(ss.context, ctx2)
1968 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001969
1970 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1971 # A simple IO loop. Call func(*args) depending on the error we get
1972 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1973 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001974 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001975 count = 0
1976 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001977 if time.monotonic() > deadline:
1978 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001979 errno = None
1980 count += 1
1981 try:
1982 ret = func(*args)
1983 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001984 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001985 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001986 raise
1987 errno = e.errno
1988 # Get any data from the outgoing BIO irrespective of any error, and
1989 # send it to the socket.
1990 buf = outgoing.read()
1991 sock.sendall(buf)
1992 # If there's no error, we're done. For WANT_READ, we need to get
1993 # data from the socket and put it in the incoming BIO.
1994 if errno is None:
1995 break
1996 elif errno == ssl.SSL_ERROR_WANT_READ:
1997 buf = sock.recv(32768)
1998 if buf:
1999 incoming.write(buf)
2000 else:
2001 incoming.write_eof()
2002 if support.verbose:
2003 sys.stdout.write("Needed %d calls to complete %s().\n"
2004 % (count, func.__name__))
2005 return ret
2006
Martin Panter3840b2a2016-03-27 01:53:46 +00002007 def test_bio_handshake(self):
2008 sock = socket.socket(socket.AF_INET)
2009 self.addCleanup(sock.close)
2010 sock.connect(self.server_addr)
2011 incoming = ssl.MemoryBIO()
2012 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002013 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2014 self.assertTrue(ctx.check_hostname)
2015 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002016 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002017 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2018 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002019 self.assertIs(sslobj._sslobj.owner, sslobj)
2020 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002021 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002022 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002023 self.assertRaises(ValueError, sslobj.getpeercert)
2024 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2025 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2026 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2027 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002028 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002029 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002030 self.assertTrue(sslobj.getpeercert())
2031 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2032 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2033 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002034 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002035 except ssl.SSLSyscallError:
2036 # If the server shuts down the TCP connection without sending a
2037 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2038 pass
2039 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2040
2041 def test_bio_read_write_data(self):
2042 sock = socket.socket(socket.AF_INET)
2043 self.addCleanup(sock.close)
2044 sock.connect(self.server_addr)
2045 incoming = ssl.MemoryBIO()
2046 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002047 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002048 ctx.verify_mode = ssl.CERT_NONE
2049 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2050 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2051 req = b'FOO\n'
2052 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2053 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2054 self.assertEqual(buf, b'foo\n')
2055 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002056
2057
Martin Panter3840b2a2016-03-27 01:53:46 +00002058class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002059
Martin Panter3840b2a2016-03-27 01:53:46 +00002060 def test_timeout_connect_ex(self):
2061 # Issue #12065: on a timeout, connect_ex() should return the original
2062 # errno (mimicking the behaviour of non-SSL sockets).
2063 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002064 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002065 cert_reqs=ssl.CERT_REQUIRED,
2066 do_handshake_on_connect=False)
2067 self.addCleanup(s.close)
2068 s.settimeout(0.0000001)
2069 rc = s.connect_ex((REMOTE_HOST, 443))
2070 if rc == 0:
2071 self.skipTest("REMOTE_HOST responded too quickly")
2072 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2073
2074 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2075 def test_get_server_certificate_ipv6(self):
2076 with support.transient_internet('ipv6.google.com'):
2077 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2078 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2079
Martin Panter3840b2a2016-03-27 01:53:46 +00002080
2081def _test_get_server_certificate(test, host, port, cert=None):
2082 pem = ssl.get_server_certificate((host, port))
2083 if not pem:
2084 test.fail("No server certificate on %s:%s!" % (host, port))
2085
2086 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2087 if not pem:
2088 test.fail("No server certificate on %s:%s!" % (host, port))
2089 if support.verbose:
2090 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2091
2092def _test_get_server_certificate_fail(test, host, port):
2093 try:
2094 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2095 except ssl.SSLError as x:
2096 #should fail
2097 if support.verbose:
2098 sys.stdout.write("%s\n" % x)
2099 else:
2100 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2101
2102
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002103from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002104
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002105class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002106
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002107 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002108
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002109 """A mildly complicated class, because we want it to work both
2110 with and without the SSL wrapper around the socket connection, so
2111 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002112
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002113 def __init__(self, server, connsock, addr):
2114 self.server = server
2115 self.running = False
2116 self.sock = connsock
2117 self.addr = addr
2118 self.sock.setblocking(1)
2119 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002120 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002121 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002122
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002123 def wrap_conn(self):
2124 try:
2125 self.sslconn = self.server.context.wrap_socket(
2126 self.sock, server_side=True)
2127 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2128 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2129 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2130 # We treat ConnectionResetError as though it were an
2131 # SSLError - OpenSSL on Ubuntu abruptly closes the
2132 # connection when asked to use an unsupported protocol.
2133 #
2134 # OSError may occur with wrong protocols, e.g. both
2135 # sides use PROTOCOL_TLS_SERVER.
2136 #
2137 # XXX Various errors can have happened here, for example
2138 # a mismatching protocol version, an invalid certificate,
2139 # or a low-level bug. This should be made more discriminating.
2140 #
2141 # bpo-31323: Store the exception as string to prevent
2142 # a reference leak: server -> conn_errors -> exception
2143 # -> traceback -> self (ConnectionHandler) -> server
2144 self.server.conn_errors.append(str(e))
2145 if self.server.chatty:
2146 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2147 self.running = False
2148 self.server.stop()
2149 self.close()
2150 return False
2151 else:
2152 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2153 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2154 cert = self.sslconn.getpeercert()
2155 if support.verbose and self.server.chatty:
2156 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2157 cert_binary = self.sslconn.getpeercert(True)
2158 if support.verbose and self.server.chatty:
2159 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2160 cipher = self.sslconn.cipher()
2161 if support.verbose and self.server.chatty:
2162 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2163 sys.stdout.write(" server: selected protocol is now "
2164 + str(self.sslconn.selected_npn_protocol()) + "\n")
2165 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002166
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002167 def read(self):
2168 if self.sslconn:
2169 return self.sslconn.read()
2170 else:
2171 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002173 def write(self, bytes):
2174 if self.sslconn:
2175 return self.sslconn.write(bytes)
2176 else:
2177 return self.sock.send(bytes)
2178
2179 def close(self):
2180 if self.sslconn:
2181 self.sslconn.close()
2182 else:
2183 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002184
Antoine Pitrou480a1242010-04-28 21:37:09 +00002185 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002186 self.running = True
2187 if not self.server.starttls_server:
2188 if not self.wrap_conn():
2189 return
2190 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002191 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002192 msg = self.read()
2193 stripped = msg.strip()
2194 if not stripped:
2195 # eof, so quit this handler
2196 self.running = False
2197 try:
2198 self.sock = self.sslconn.unwrap()
2199 except OSError:
2200 # Many tests shut the TCP connection down
2201 # without an SSL shutdown. This causes
2202 # unwrap() to raise OSError with errno=0!
2203 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002204 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002205 self.sslconn = None
2206 self.close()
2207 elif stripped == b'over':
2208 if support.verbose and self.server.connectionchatty:
2209 sys.stdout.write(" server: client closed connection\n")
2210 self.close()
2211 return
2212 elif (self.server.starttls_server and
2213 stripped == b'STARTTLS'):
2214 if support.verbose and self.server.connectionchatty:
2215 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2216 self.write(b"OK\n")
2217 if not self.wrap_conn():
2218 return
2219 elif (self.server.starttls_server and self.sslconn
2220 and stripped == b'ENDTLS'):
2221 if support.verbose and self.server.connectionchatty:
2222 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2223 self.write(b"OK\n")
2224 self.sock = self.sslconn.unwrap()
2225 self.sslconn = None
2226 if support.verbose and self.server.connectionchatty:
2227 sys.stdout.write(" server: connection is now unencrypted...\n")
2228 elif stripped == b'CB tls-unique':
2229 if support.verbose and self.server.connectionchatty:
2230 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2231 data = self.sslconn.get_channel_binding("tls-unique")
2232 self.write(repr(data).encode("us-ascii") + b"\n")
2233 else:
2234 if (support.verbose and
2235 self.server.connectionchatty):
2236 ctype = (self.sslconn and "encrypted") or "unencrypted"
2237 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2238 % (msg, ctype, msg.lower(), ctype))
2239 self.write(msg.lower())
2240 except OSError:
2241 if self.server.chatty:
2242 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002243 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002244 self.running = False
2245 # normally, we'd just stop here, but for the test
2246 # harness, we want to stop the server
2247 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002248
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002249 def __init__(self, certificate=None, ssl_version=None,
2250 certreqs=None, cacerts=None,
2251 chatty=True, connectionchatty=False, starttls_server=False,
2252 npn_protocols=None, alpn_protocols=None,
2253 ciphers=None, context=None):
2254 if context:
2255 self.context = context
2256 else:
2257 self.context = ssl.SSLContext(ssl_version
2258 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002259 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002260 self.context.verify_mode = (certreqs if certreqs is not None
2261 else ssl.CERT_NONE)
2262 if cacerts:
2263 self.context.load_verify_locations(cacerts)
2264 if certificate:
2265 self.context.load_cert_chain(certificate)
2266 if npn_protocols:
2267 self.context.set_npn_protocols(npn_protocols)
2268 if alpn_protocols:
2269 self.context.set_alpn_protocols(alpn_protocols)
2270 if ciphers:
2271 self.context.set_ciphers(ciphers)
2272 self.chatty = chatty
2273 self.connectionchatty = connectionchatty
2274 self.starttls_server = starttls_server
2275 self.sock = socket.socket()
2276 self.port = support.bind_port(self.sock)
2277 self.flag = None
2278 self.active = False
2279 self.selected_npn_protocols = []
2280 self.selected_alpn_protocols = []
2281 self.shared_ciphers = []
2282 self.conn_errors = []
2283 threading.Thread.__init__(self)
2284 self.daemon = True
2285
2286 def __enter__(self):
2287 self.start(threading.Event())
2288 self.flag.wait()
2289 return self
2290
2291 def __exit__(self, *args):
2292 self.stop()
2293 self.join()
2294
2295 def start(self, flag=None):
2296 self.flag = flag
2297 threading.Thread.start(self)
2298
2299 def run(self):
2300 self.sock.settimeout(0.05)
2301 self.sock.listen()
2302 self.active = True
2303 if self.flag:
2304 # signal an event
2305 self.flag.set()
2306 while self.active:
2307 try:
2308 newconn, connaddr = self.sock.accept()
2309 if support.verbose and self.chatty:
2310 sys.stdout.write(' server: new connection from '
2311 + repr(connaddr) + '\n')
2312 handler = self.ConnectionHandler(self, newconn, connaddr)
2313 handler.start()
2314 handler.join()
2315 except socket.timeout:
2316 pass
2317 except KeyboardInterrupt:
2318 self.stop()
2319 self.sock.close()
2320
2321 def stop(self):
2322 self.active = False
2323
2324class AsyncoreEchoServer(threading.Thread):
2325
2326 # this one's based on asyncore.dispatcher
2327
2328 class EchoServer (asyncore.dispatcher):
2329
2330 class ConnectionHandler(asyncore.dispatcher_with_send):
2331
2332 def __init__(self, conn, certfile):
2333 self.socket = test_wrap_socket(conn, server_side=True,
2334 certfile=certfile,
2335 do_handshake_on_connect=False)
2336 asyncore.dispatcher_with_send.__init__(self, self.socket)
2337 self._ssl_accepting = True
2338 self._do_ssl_handshake()
2339
2340 def readable(self):
2341 if isinstance(self.socket, ssl.SSLSocket):
2342 while self.socket.pending() > 0:
2343 self.handle_read_event()
2344 return True
2345
2346 def _do_ssl_handshake(self):
2347 try:
2348 self.socket.do_handshake()
2349 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2350 return
2351 except ssl.SSLEOFError:
2352 return self.handle_close()
2353 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002354 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002355 except OSError as err:
2356 if err.args[0] == errno.ECONNABORTED:
2357 return self.handle_close()
2358 else:
2359 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002360
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002361 def handle_read(self):
2362 if self._ssl_accepting:
2363 self._do_ssl_handshake()
2364 else:
2365 data = self.recv(1024)
2366 if support.verbose:
2367 sys.stdout.write(" server: read %s from client\n" % repr(data))
2368 if not data:
2369 self.close()
2370 else:
2371 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002372
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002373 def handle_close(self):
2374 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002375 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002376 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002377
2378 def handle_error(self):
2379 raise
2380
Trent Nelson78520002008-04-10 20:54:35 +00002381 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002382 self.certfile = certfile
2383 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2384 self.port = support.bind_port(sock, '')
2385 asyncore.dispatcher.__init__(self, sock)
2386 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002387
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002388 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002389 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002390 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2391 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002392
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002393 def handle_error(self):
2394 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002395
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002396 def __init__(self, certfile):
2397 self.flag = None
2398 self.active = False
2399 self.server = self.EchoServer(certfile)
2400 self.port = self.server.port
2401 threading.Thread.__init__(self)
2402 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002403
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002404 def __str__(self):
2405 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002406
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002407 def __enter__(self):
2408 self.start(threading.Event())
2409 self.flag.wait()
2410 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002411
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002412 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002413 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002414 sys.stdout.write(" cleanup: stopping server.\n")
2415 self.stop()
2416 if support.verbose:
2417 sys.stdout.write(" cleanup: joining server thread.\n")
2418 self.join()
2419 if support.verbose:
2420 sys.stdout.write(" cleanup: successfully joined.\n")
2421 # make sure that ConnectionHandler is removed from socket_map
2422 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002423
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002424 def start (self, flag=None):
2425 self.flag = flag
2426 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002427
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002428 def run(self):
2429 self.active = True
2430 if self.flag:
2431 self.flag.set()
2432 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002433 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002434 asyncore.loop(1)
2435 except:
2436 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002437
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002438 def stop(self):
2439 self.active = False
2440 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002441
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002442def server_params_test(client_context, server_context, indata=b"FOO\n",
2443 chatty=True, connectionchatty=False, sni_name=None,
2444 session=None):
2445 """
2446 Launch a server, connect a client to it and try various reads
2447 and writes.
2448 """
2449 stats = {}
2450 server = ThreadedEchoServer(context=server_context,
2451 chatty=chatty,
2452 connectionchatty=False)
2453 with server:
2454 with client_context.wrap_socket(socket.socket(),
2455 server_hostname=sni_name, session=session) as s:
2456 s.connect((HOST, server.port))
2457 for arg in [indata, bytearray(indata), memoryview(indata)]:
2458 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002459 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002460 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002461 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002462 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002463 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002464 if connectionchatty:
2465 if support.verbose:
2466 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002467 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002468 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002469 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2470 % (outdata[:20], len(outdata),
2471 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002472 s.write(b"over\n")
2473 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002474 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002475 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002476 stats.update({
2477 'compression': s.compression(),
2478 'cipher': s.cipher(),
2479 'peercert': s.getpeercert(),
2480 'client_alpn_protocol': s.selected_alpn_protocol(),
2481 'client_npn_protocol': s.selected_npn_protocol(),
2482 'version': s.version(),
2483 'session_reused': s.session_reused,
2484 'session': s.session,
2485 })
2486 s.close()
2487 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2488 stats['server_npn_protocols'] = server.selected_npn_protocols
2489 stats['server_shared_ciphers'] = server.shared_ciphers
2490 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002491
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002492def try_protocol_combo(server_protocol, client_protocol, expect_success,
2493 certsreqs=None, server_options=0, client_options=0):
2494 """
2495 Try to SSL-connect using *client_protocol* to *server_protocol*.
2496 If *expect_success* is true, assert that the connection succeeds,
2497 if it's false, assert that the connection fails.
2498 Also, if *expect_success* is a string, assert that it is the protocol
2499 version actually used by the connection.
2500 """
2501 if certsreqs is None:
2502 certsreqs = ssl.CERT_NONE
2503 certtype = {
2504 ssl.CERT_NONE: "CERT_NONE",
2505 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2506 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2507 }[certsreqs]
2508 if support.verbose:
2509 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2510 sys.stdout.write(formatstr %
2511 (ssl.get_protocol_name(client_protocol),
2512 ssl.get_protocol_name(server_protocol),
2513 certtype))
2514 client_context = ssl.SSLContext(client_protocol)
2515 client_context.options |= client_options
2516 server_context = ssl.SSLContext(server_protocol)
2517 server_context.options |= server_options
2518
2519 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2520 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2521 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002522 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002523 client_context.set_ciphers("ALL")
2524
2525 for ctx in (client_context, server_context):
2526 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002527 ctx.load_cert_chain(SIGNED_CERTFILE)
2528 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 try:
2530 stats = server_params_test(client_context, server_context,
2531 chatty=False, connectionchatty=False)
2532 # Protocol mismatch can result in either an SSLError, or a
2533 # "Connection reset by peer" error.
2534 except ssl.SSLError:
2535 if expect_success:
2536 raise
2537 except OSError as e:
2538 if expect_success or e.errno != errno.ECONNRESET:
2539 raise
2540 else:
2541 if not expect_success:
2542 raise AssertionError(
2543 "Client protocol %s succeeded with server protocol %s!"
2544 % (ssl.get_protocol_name(client_protocol),
2545 ssl.get_protocol_name(server_protocol)))
2546 elif (expect_success is not True
2547 and expect_success != stats['version']):
2548 raise AssertionError("version mismatch: expected %r, got %r"
2549 % (expect_success, stats['version']))
2550
2551
2552class ThreadedTests(unittest.TestCase):
2553
2554 @skip_if_broken_ubuntu_ssl
2555 def test_echo(self):
2556 """Basic test of an SSL client connecting to a server"""
2557 if support.verbose:
2558 sys.stdout.write("\n")
2559 for protocol in PROTOCOLS:
2560 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2561 continue
2562 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2563 context = ssl.SSLContext(protocol)
2564 context.load_cert_chain(CERTFILE)
2565 server_params_test(context, context,
2566 chatty=True, connectionchatty=True)
2567
Christian Heimesa170fa12017-09-15 20:27:30 +02002568 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002569
2570 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2571 server_params_test(client_context=client_context,
2572 server_context=server_context,
2573 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002574 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002575
2576 client_context.check_hostname = False
2577 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2578 with self.assertRaises(ssl.SSLError) as e:
2579 server_params_test(client_context=server_context,
2580 server_context=client_context,
2581 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002582 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002583 self.assertIn('called a function you should not call',
2584 str(e.exception))
2585
2586 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2587 with self.assertRaises(ssl.SSLError) as e:
2588 server_params_test(client_context=server_context,
2589 server_context=server_context,
2590 chatty=True, connectionchatty=True)
2591 self.assertIn('called a function you should not call',
2592 str(e.exception))
2593
2594 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2595 with self.assertRaises(ssl.SSLError) as e:
2596 server_params_test(client_context=server_context,
2597 server_context=client_context,
2598 chatty=True, connectionchatty=True)
2599 self.assertIn('called a function you should not call',
2600 str(e.exception))
2601
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602 def test_getpeercert(self):
2603 if support.verbose:
2604 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002605
2606 client_context, server_context, hostname = testing_context()
2607 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002608 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002609 with client_context.wrap_socket(socket.socket(),
2610 do_handshake_on_connect=False,
2611 server_hostname=hostname) as s:
2612 s.connect((HOST, server.port))
2613 # getpeercert() raise ValueError while the handshake isn't
2614 # done.
2615 with self.assertRaises(ValueError):
2616 s.getpeercert()
2617 s.do_handshake()
2618 cert = s.getpeercert()
2619 self.assertTrue(cert, "Can't get peer certificate.")
2620 cipher = s.cipher()
2621 if support.verbose:
2622 sys.stdout.write(pprint.pformat(cert) + '\n')
2623 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2624 if 'subject' not in cert:
2625 self.fail("No subject field in certificate: %s." %
2626 pprint.pformat(cert))
2627 if ((('organizationName', 'Python Software Foundation'),)
2628 not in cert['subject']):
2629 self.fail(
2630 "Missing or invalid 'organizationName' field in certificate subject; "
2631 "should be 'Python Software Foundation'.")
2632 self.assertIn('notBefore', cert)
2633 self.assertIn('notAfter', cert)
2634 before = ssl.cert_time_to_seconds(cert['notBefore'])
2635 after = ssl.cert_time_to_seconds(cert['notAfter'])
2636 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002637
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002638 @unittest.skipUnless(have_verify_flags(),
2639 "verify_flags need OpenSSL > 0.9.8")
2640 def test_crl_check(self):
2641 if support.verbose:
2642 sys.stdout.write("\n")
2643
Christian Heimesa170fa12017-09-15 20:27:30 +02002644 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002645
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002646 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002647 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002648
2649 # VERIFY_DEFAULT should pass
2650 server = ThreadedEchoServer(context=server_context, chatty=True)
2651 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002652 with client_context.wrap_socket(socket.socket(),
2653 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002654 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002655 cert = s.getpeercert()
2656 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002657
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002658 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002659 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002660
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002661 server = ThreadedEchoServer(context=server_context, chatty=True)
2662 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002663 with client_context.wrap_socket(socket.socket(),
2664 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002665 with self.assertRaisesRegex(ssl.SSLError,
2666 "certificate verify failed"):
2667 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002668
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002669 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002670 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002671
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672 server = ThreadedEchoServer(context=server_context, chatty=True)
2673 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002674 with client_context.wrap_socket(socket.socket(),
2675 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002676 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002677 cert = s.getpeercert()
2678 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002679
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002680 def test_check_hostname(self):
2681 if support.verbose:
2682 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002683
Christian Heimesa170fa12017-09-15 20:27:30 +02002684 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002685
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002686 # correct hostname should verify
2687 server = ThreadedEchoServer(context=server_context, chatty=True)
2688 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002689 with client_context.wrap_socket(socket.socket(),
2690 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002691 s.connect((HOST, server.port))
2692 cert = s.getpeercert()
2693 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002694
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002695 # incorrect hostname should raise an exception
2696 server = ThreadedEchoServer(context=server_context, chatty=True)
2697 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002698 with client_context.wrap_socket(socket.socket(),
2699 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002700 with self.assertRaisesRegex(
2701 ssl.CertificateError,
2702 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002703 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002704
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705 # missing server_hostname arg should cause an exception, too
2706 server = ThreadedEchoServer(context=server_context, chatty=True)
2707 with server:
2708 with socket.socket() as s:
2709 with self.assertRaisesRegex(ValueError,
2710 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002711 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002712
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002713 def test_ecc_cert(self):
2714 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2715 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002716 client_context.set_ciphers(
2717 'TLS13-AES-128-GCM-SHA256:TLS13-CHACHA20-POLY1305-SHA256:'
2718 'ECDHE:ECDSA:!NULL:!aRSA'
2719 )
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002720 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2721
2722 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2723 # load ECC cert
2724 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2725
2726 # correct hostname should verify
2727 server = ThreadedEchoServer(context=server_context, chatty=True)
2728 with server:
2729 with client_context.wrap_socket(socket.socket(),
2730 server_hostname=hostname) as s:
2731 s.connect((HOST, server.port))
2732 cert = s.getpeercert()
2733 self.assertTrue(cert, "Can't get peer certificate.")
2734 cipher = s.cipher()[0].split('-')
2735 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2736
2737 def test_dual_rsa_ecc(self):
2738 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2739 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002740 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2741 # algorithms.
2742 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002743 # only ECDSA certs
2744 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2745 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2746
2747 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2748 # load ECC and RSA key/cert pairs
2749 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2750 server_context.load_cert_chain(SIGNED_CERTFILE)
2751
2752 # correct hostname should verify
2753 server = ThreadedEchoServer(context=server_context, chatty=True)
2754 with server:
2755 with client_context.wrap_socket(socket.socket(),
2756 server_hostname=hostname) as s:
2757 s.connect((HOST, server.port))
2758 cert = s.getpeercert()
2759 self.assertTrue(cert, "Can't get peer certificate.")
2760 cipher = s.cipher()[0].split('-')
2761 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2762
Christian Heimes66e57422018-01-29 14:25:13 +01002763 def test_check_hostname_idn(self):
2764 if support.verbose:
2765 sys.stdout.write("\n")
2766
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002767 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002768 server_context.load_cert_chain(IDNSANSFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002769 # TODO: fix TLSv1.3 support
2770 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimes66e57422018-01-29 14:25:13 +01002771
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002772 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002773 context.verify_mode = ssl.CERT_REQUIRED
2774 context.check_hostname = True
2775 context.load_verify_locations(SIGNING_CA)
2776
2777 # correct hostname should verify, when specified in several
2778 # different ways
2779 idn_hostnames = [
2780 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002781 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002782 ('xn--knig-5qa.idn.pythontest.net',
2783 'xn--knig-5qa.idn.pythontest.net'),
2784 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002785 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002786
2787 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002788 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002789 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2790 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2791 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002792 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2793
2794 # ('königsgäßchen.idna2008.pythontest.net',
2795 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2796 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2797 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2798 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2799 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2800
Christian Heimes66e57422018-01-29 14:25:13 +01002801 ]
2802 for server_hostname, expected_hostname in idn_hostnames:
2803 server = ThreadedEchoServer(context=server_context, chatty=True)
2804 with server:
2805 with context.wrap_socket(socket.socket(),
2806 server_hostname=server_hostname) as s:
2807 self.assertEqual(s.server_hostname, expected_hostname)
2808 s.connect((HOST, server.port))
2809 cert = s.getpeercert()
2810 self.assertEqual(s.server_hostname, expected_hostname)
2811 self.assertTrue(cert, "Can't get peer certificate.")
2812
Christian Heimes66e57422018-01-29 14:25:13 +01002813 # incorrect hostname should raise an exception
2814 server = ThreadedEchoServer(context=server_context, chatty=True)
2815 with server:
2816 with context.wrap_socket(socket.socket(),
2817 server_hostname="python.example.org") as s:
2818 with self.assertRaises(ssl.CertificateError):
2819 s.connect((HOST, server.port))
2820
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002821 def test_wrong_cert(self):
2822 """Connecting when the server rejects the client's certificate
2823
2824 Launch a server with CERT_REQUIRED, and check that trying to
2825 connect to it with a wrong client certificate fails.
2826 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002827 client_context, server_context, hostname = testing_context()
2828 # load client cert
2829 client_context.load_cert_chain(WRONG_CERT)
2830 # require TLS client authentication
2831 server_context.verify_mode = ssl.CERT_REQUIRED
2832 # TODO: fix TLSv1.3 support
2833 # With TLS 1.3, test fails with exception in server thread
2834 server_context.options |= ssl.OP_NO_TLSv1_3
2835
2836 server = ThreadedEchoServer(
2837 context=server_context, chatty=True, connectionchatty=True,
2838 )
2839
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002840 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002841 client_context.wrap_socket(socket.socket(),
2842 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002843 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002844 # Expect either an SSL error about the server rejecting
2845 # the connection, or a low-level connection reset (which
2846 # sometimes happens on Windows)
2847 s.connect((HOST, server.port))
2848 except ssl.SSLError as e:
2849 if support.verbose:
2850 sys.stdout.write("\nSSLError is %r\n" % e)
2851 except OSError as e:
2852 if e.errno != errno.ECONNRESET:
2853 raise
2854 if support.verbose:
2855 sys.stdout.write("\nsocket.error is %r\n" % e)
2856 else:
2857 self.fail("Use of invalid cert should have failed!")
2858
2859 def test_rude_shutdown(self):
2860 """A brutal shutdown of an SSL server should raise an OSError
2861 in the client when attempting handshake.
2862 """
2863 listener_ready = threading.Event()
2864 listener_gone = threading.Event()
2865
2866 s = socket.socket()
2867 port = support.bind_port(s, HOST)
2868
2869 # `listener` runs in a thread. It sits in an accept() until
2870 # the main thread connects. Then it rudely closes the socket,
2871 # and sets Event `listener_gone` to let the main thread know
2872 # the socket is gone.
2873 def listener():
2874 s.listen()
2875 listener_ready.set()
2876 newsock, addr = s.accept()
2877 newsock.close()
2878 s.close()
2879 listener_gone.set()
2880
2881 def connector():
2882 listener_ready.wait()
2883 with socket.socket() as c:
2884 c.connect((HOST, port))
2885 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002886 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002887 ssl_sock = test_wrap_socket(c)
2888 except OSError:
2889 pass
2890 else:
2891 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002892
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002893 t = threading.Thread(target=listener)
2894 t.start()
2895 try:
2896 connector()
2897 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002898 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002899
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002900 def test_ssl_cert_verify_error(self):
2901 if support.verbose:
2902 sys.stdout.write("\n")
2903
2904 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2905 server_context.load_cert_chain(SIGNED_CERTFILE)
2906
2907 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2908
2909 server = ThreadedEchoServer(context=server_context, chatty=True)
2910 with server:
2911 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002912 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002913 try:
2914 s.connect((HOST, server.port))
2915 except ssl.SSLError as e:
2916 msg = 'unable to get local issuer certificate'
2917 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2918 self.assertEqual(e.verify_code, 20)
2919 self.assertEqual(e.verify_message, msg)
2920 self.assertIn(msg, repr(e))
2921 self.assertIn('certificate verify failed', repr(e))
2922
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002923 @skip_if_broken_ubuntu_ssl
2924 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2925 "OpenSSL is compiled without SSLv2 support")
2926 def test_protocol_sslv2(self):
2927 """Connecting to an SSLv2 server with various client options"""
2928 if support.verbose:
2929 sys.stdout.write("\n")
2930 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2931 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2932 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002933 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002934 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2935 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2936 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2937 # SSLv23 client with specific SSL options
2938 if no_sslv2_implies_sslv3_hello():
2939 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002940 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002941 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002942 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002943 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002944 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002945 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002946
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002947 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002948 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002949 """Connecting to an SSLv23 server with various client options"""
2950 if support.verbose:
2951 sys.stdout.write("\n")
2952 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002953 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002954 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002955 except OSError as x:
2956 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2957 if support.verbose:
2958 sys.stdout.write(
2959 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2960 % str(x))
2961 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002962 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2963 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2964 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002965
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002966 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002967 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2968 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2969 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002970
2971 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002972 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2973 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2974 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002975
2976 # Server with specific SSL options
2977 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002978 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002979 server_options=ssl.OP_NO_SSLv3)
2980 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002981 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002982 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002983 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002984 server_options=ssl.OP_NO_TLSv1)
2985
2986
2987 @skip_if_broken_ubuntu_ssl
2988 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2989 "OpenSSL is compiled without SSLv3 support")
2990 def test_protocol_sslv3(self):
2991 """Connecting to an SSLv3 server with various client options"""
2992 if support.verbose:
2993 sys.stdout.write("\n")
2994 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2995 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2996 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2997 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2998 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002999 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003000 client_options=ssl.OP_NO_SSLv3)
3001 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3002 if no_sslv2_implies_sslv3_hello():
3003 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003004 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003005 False, client_options=ssl.OP_NO_SSLv2)
3006
3007 @skip_if_broken_ubuntu_ssl
3008 def test_protocol_tlsv1(self):
3009 """Connecting to a TLSv1 server with various client options"""
3010 if support.verbose:
3011 sys.stdout.write("\n")
3012 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3013 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3014 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3015 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3016 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3017 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3018 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003019 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003020 client_options=ssl.OP_NO_TLSv1)
3021
3022 @skip_if_broken_ubuntu_ssl
3023 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3024 "TLS version 1.1 not supported.")
3025 def test_protocol_tlsv1_1(self):
3026 """Connecting to a TLSv1.1 server with various client options.
3027 Testing against older TLS versions."""
3028 if support.verbose:
3029 sys.stdout.write("\n")
3030 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3031 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3032 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3033 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3034 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003035 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003036 client_options=ssl.OP_NO_TLSv1_1)
3037
Christian Heimesa170fa12017-09-15 20:27:30 +02003038 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003039 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3040 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3041
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003042 @skip_if_broken_ubuntu_ssl
3043 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3044 "TLS version 1.2 not supported.")
3045 def test_protocol_tlsv1_2(self):
3046 """Connecting to a TLSv1.2 server with various client options.
3047 Testing against older TLS versions."""
3048 if support.verbose:
3049 sys.stdout.write("\n")
3050 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3051 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3052 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3053 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3054 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3055 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3056 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003057 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003058 client_options=ssl.OP_NO_TLSv1_2)
3059
Christian Heimesa170fa12017-09-15 20:27:30 +02003060 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003061 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3062 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3063 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3064 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3065
3066 def test_starttls(self):
3067 """Switching from clear text to encrypted and back again."""
3068 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3069
3070 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003071 starttls_server=True,
3072 chatty=True,
3073 connectionchatty=True)
3074 wrapped = False
3075 with server:
3076 s = socket.socket()
3077 s.setblocking(1)
3078 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003079 if support.verbose:
3080 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003081 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003082 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003083 sys.stdout.write(
3084 " client: sending %r...\n" % indata)
3085 if wrapped:
3086 conn.write(indata)
3087 outdata = conn.read()
3088 else:
3089 s.send(indata)
3090 outdata = s.recv(1024)
3091 msg = outdata.strip().lower()
3092 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3093 # STARTTLS ok, switch to secure mode
3094 if support.verbose:
3095 sys.stdout.write(
3096 " client: read %r from server, starting TLS...\n"
3097 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003098 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003099 wrapped = True
3100 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3101 # ENDTLS ok, switch back to clear text
3102 if support.verbose:
3103 sys.stdout.write(
3104 " client: read %r from server, ending TLS...\n"
3105 % msg)
3106 s = conn.unwrap()
3107 wrapped = False
3108 else:
3109 if support.verbose:
3110 sys.stdout.write(
3111 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003112 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003113 sys.stdout.write(" client: closing connection.\n")
3114 if wrapped:
3115 conn.write(b"over\n")
3116 else:
3117 s.send(b"over\n")
3118 if wrapped:
3119 conn.close()
3120 else:
3121 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003122
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123 def test_socketserver(self):
3124 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003125 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003126 # try to connect
3127 if support.verbose:
3128 sys.stdout.write('\n')
3129 with open(CERTFILE, 'rb') as f:
3130 d1 = f.read()
3131 d2 = ''
3132 # now fetch the same data from the HTTPS server
3133 url = 'https://localhost:%d/%s' % (
3134 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003135 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003136 f = urllib.request.urlopen(url, context=context)
3137 try:
3138 dlen = f.info().get("content-length")
3139 if dlen and (int(dlen) > 0):
3140 d2 = f.read(int(dlen))
3141 if support.verbose:
3142 sys.stdout.write(
3143 " client: read %d bytes from remote server '%s'\n"
3144 % (len(d2), server))
3145 finally:
3146 f.close()
3147 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003148
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003149 def test_asyncore_server(self):
3150 """Check the example asyncore integration."""
3151 if support.verbose:
3152 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003153
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003154 indata = b"FOO\n"
3155 server = AsyncoreEchoServer(CERTFILE)
3156 with server:
3157 s = test_wrap_socket(socket.socket())
3158 s.connect(('127.0.0.1', server.port))
3159 if support.verbose:
3160 sys.stdout.write(
3161 " client: sending %r...\n" % indata)
3162 s.write(indata)
3163 outdata = s.read()
3164 if support.verbose:
3165 sys.stdout.write(" client: read %r\n" % outdata)
3166 if outdata != indata.lower():
3167 self.fail(
3168 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3169 % (outdata[:20], len(outdata),
3170 indata[:20].lower(), len(indata)))
3171 s.write(b"over\n")
3172 if support.verbose:
3173 sys.stdout.write(" client: closing connection.\n")
3174 s.close()
3175 if support.verbose:
3176 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003177
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003178 def test_recv_send(self):
3179 """Test recv(), send() and friends."""
3180 if support.verbose:
3181 sys.stdout.write("\n")
3182
3183 server = ThreadedEchoServer(CERTFILE,
3184 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003185 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003186 cacerts=CERTFILE,
3187 chatty=True,
3188 connectionchatty=False)
3189 with server:
3190 s = test_wrap_socket(socket.socket(),
3191 server_side=False,
3192 certfile=CERTFILE,
3193 ca_certs=CERTFILE,
3194 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003195 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003196 s.connect((HOST, server.port))
3197 # helper methods for standardising recv* method signatures
3198 def _recv_into():
3199 b = bytearray(b"\0"*100)
3200 count = s.recv_into(b)
3201 return b[:count]
3202
3203 def _recvfrom_into():
3204 b = bytearray(b"\0"*100)
3205 count, addr = s.recvfrom_into(b)
3206 return b[:count]
3207
3208 # (name, method, expect success?, *args, return value func)
3209 send_methods = [
3210 ('send', s.send, True, [], len),
3211 ('sendto', s.sendto, False, ["some.address"], len),
3212 ('sendall', s.sendall, True, [], lambda x: None),
3213 ]
3214 # (name, method, whether to expect success, *args)
3215 recv_methods = [
3216 ('recv', s.recv, True, []),
3217 ('recvfrom', s.recvfrom, False, ["some.address"]),
3218 ('recv_into', _recv_into, True, []),
3219 ('recvfrom_into', _recvfrom_into, False, []),
3220 ]
3221 data_prefix = "PREFIX_"
3222
3223 for (meth_name, send_meth, expect_success, args,
3224 ret_val_meth) in send_methods:
3225 indata = (data_prefix + meth_name).encode('ascii')
3226 try:
3227 ret = send_meth(indata, *args)
3228 msg = "sending with {}".format(meth_name)
3229 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3230 outdata = s.read()
3231 if outdata != indata.lower():
3232 self.fail(
3233 "While sending with <<{name:s}>> bad data "
3234 "<<{outdata:r}>> ({nout:d}) received; "
3235 "expected <<{indata:r}>> ({nin:d})\n".format(
3236 name=meth_name, outdata=outdata[:20],
3237 nout=len(outdata),
3238 indata=indata[:20], nin=len(indata)
3239 )
3240 )
3241 except ValueError as e:
3242 if expect_success:
3243 self.fail(
3244 "Failed to send with method <<{name:s}>>; "
3245 "expected to succeed.\n".format(name=meth_name)
3246 )
3247 if not str(e).startswith(meth_name):
3248 self.fail(
3249 "Method <<{name:s}>> failed with unexpected "
3250 "exception message: {exp:s}\n".format(
3251 name=meth_name, exp=e
3252 )
3253 )
3254
3255 for meth_name, recv_meth, expect_success, args in recv_methods:
3256 indata = (data_prefix + meth_name).encode('ascii')
3257 try:
3258 s.send(indata)
3259 outdata = recv_meth(*args)
3260 if outdata != indata.lower():
3261 self.fail(
3262 "While receiving with <<{name:s}>> bad data "
3263 "<<{outdata:r}>> ({nout:d}) received; "
3264 "expected <<{indata:r}>> ({nin:d})\n".format(
3265 name=meth_name, outdata=outdata[:20],
3266 nout=len(outdata),
3267 indata=indata[:20], nin=len(indata)
3268 )
3269 )
3270 except ValueError as e:
3271 if expect_success:
3272 self.fail(
3273 "Failed to receive with method <<{name:s}>>; "
3274 "expected to succeed.\n".format(name=meth_name)
3275 )
3276 if not str(e).startswith(meth_name):
3277 self.fail(
3278 "Method <<{name:s}>> failed with unexpected "
3279 "exception message: {exp:s}\n".format(
3280 name=meth_name, exp=e
3281 )
3282 )
3283 # consume data
3284 s.read()
3285
3286 # read(-1, buffer) is supported, even though read(-1) is not
3287 data = b"data"
3288 s.send(data)
3289 buffer = bytearray(len(data))
3290 self.assertEqual(s.read(-1, buffer), len(data))
3291 self.assertEqual(buffer, data)
3292
Christian Heimes888bbdc2017-09-07 14:18:21 -07003293 # sendall accepts bytes-like objects
3294 if ctypes is not None:
3295 ubyte = ctypes.c_ubyte * len(data)
3296 byteslike = ubyte.from_buffer_copy(data)
3297 s.sendall(byteslike)
3298 self.assertEqual(s.read(), data)
3299
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003300 # Make sure sendmsg et al are disallowed to avoid
3301 # inadvertent disclosure of data and/or corruption
3302 # of the encrypted data stream
3303 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3304 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3305 self.assertRaises(NotImplementedError,
3306 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003307 s.write(b"over\n")
3308
3309 self.assertRaises(ValueError, s.recv, -1)
3310 self.assertRaises(ValueError, s.read, -1)
3311
3312 s.close()
3313
3314 def test_recv_zero(self):
3315 server = ThreadedEchoServer(CERTFILE)
3316 server.__enter__()
3317 self.addCleanup(server.__exit__, None, None)
3318 s = socket.create_connection((HOST, server.port))
3319 self.addCleanup(s.close)
3320 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3321 self.addCleanup(s.close)
3322
3323 # recv/read(0) should return no data
3324 s.send(b"data")
3325 self.assertEqual(s.recv(0), b"")
3326 self.assertEqual(s.read(0), b"")
3327 self.assertEqual(s.read(), b"data")
3328
3329 # Should not block if the other end sends no data
3330 s.setblocking(False)
3331 self.assertEqual(s.recv(0), b"")
3332 self.assertEqual(s.recv_into(bytearray()), 0)
3333
3334 def test_nonblocking_send(self):
3335 server = ThreadedEchoServer(CERTFILE,
3336 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003337 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003338 cacerts=CERTFILE,
3339 chatty=True,
3340 connectionchatty=False)
3341 with server:
3342 s = test_wrap_socket(socket.socket(),
3343 server_side=False,
3344 certfile=CERTFILE,
3345 ca_certs=CERTFILE,
3346 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003347 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003348 s.connect((HOST, server.port))
3349 s.setblocking(False)
3350
3351 # If we keep sending data, at some point the buffers
3352 # will be full and the call will block
3353 buf = bytearray(8192)
3354 def fill_buffer():
3355 while True:
3356 s.send(buf)
3357 self.assertRaises((ssl.SSLWantWriteError,
3358 ssl.SSLWantReadError), fill_buffer)
3359
3360 # Now read all the output and discard it
3361 s.setblocking(True)
3362 s.close()
3363
3364 def test_handshake_timeout(self):
3365 # Issue #5103: SSL handshake must respect the socket timeout
3366 server = socket.socket(socket.AF_INET)
3367 host = "127.0.0.1"
3368 port = support.bind_port(server)
3369 started = threading.Event()
3370 finish = False
3371
3372 def serve():
3373 server.listen()
3374 started.set()
3375 conns = []
3376 while not finish:
3377 r, w, e = select.select([server], [], [], 0.1)
3378 if server in r:
3379 # Let the socket hang around rather than having
3380 # it closed by garbage collection.
3381 conns.append(server.accept()[0])
3382 for sock in conns:
3383 sock.close()
3384
3385 t = threading.Thread(target=serve)
3386 t.start()
3387 started.wait()
3388
3389 try:
3390 try:
3391 c = socket.socket(socket.AF_INET)
3392 c.settimeout(0.2)
3393 c.connect((host, port))
3394 # Will attempt handshake and time out
3395 self.assertRaisesRegex(socket.timeout, "timed out",
3396 test_wrap_socket, c)
3397 finally:
3398 c.close()
3399 try:
3400 c = socket.socket(socket.AF_INET)
3401 c = test_wrap_socket(c)
3402 c.settimeout(0.2)
3403 # Will attempt handshake and time out
3404 self.assertRaisesRegex(socket.timeout, "timed out",
3405 c.connect, (host, port))
3406 finally:
3407 c.close()
3408 finally:
3409 finish = True
3410 t.join()
3411 server.close()
3412
3413 def test_server_accept(self):
3414 # Issue #16357: accept() on a SSLSocket created through
3415 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003416 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003417 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003418 context.load_verify_locations(SIGNING_CA)
3419 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003420 server = socket.socket(socket.AF_INET)
3421 host = "127.0.0.1"
3422 port = support.bind_port(server)
3423 server = context.wrap_socket(server, server_side=True)
3424 self.assertTrue(server.server_side)
3425
3426 evt = threading.Event()
3427 remote = None
3428 peer = None
3429 def serve():
3430 nonlocal remote, peer
3431 server.listen()
3432 # Block on the accept and wait on the connection to close.
3433 evt.set()
3434 remote, peer = server.accept()
3435 remote.recv(1)
3436
3437 t = threading.Thread(target=serve)
3438 t.start()
3439 # Client wait until server setup and perform a connect.
3440 evt.wait()
3441 client = context.wrap_socket(socket.socket())
3442 client.connect((host, port))
3443 client_addr = client.getsockname()
3444 client.close()
3445 t.join()
3446 remote.close()
3447 server.close()
3448 # Sanity checks.
3449 self.assertIsInstance(remote, ssl.SSLSocket)
3450 self.assertEqual(peer, client_addr)
3451
3452 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003453 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003454 with context.wrap_socket(socket.socket()) as sock:
3455 with self.assertRaises(OSError) as cm:
3456 sock.getpeercert()
3457 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3458
3459 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003460 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003461 with context.wrap_socket(socket.socket()) as sock:
3462 with self.assertRaises(OSError) as cm:
3463 sock.do_handshake()
3464 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3465
3466 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003467 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003468 try:
3469 # Force a set of weak ciphers on our client context
3470 context.set_ciphers("DES")
3471 except ssl.SSLError:
3472 self.skipTest("no DES cipher available")
3473 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003474 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003475 chatty=False) as server:
3476 with context.wrap_socket(socket.socket()) as s:
3477 with self.assertRaises(OSError):
3478 s.connect((HOST, server.port))
3479 self.assertIn("no shared cipher", server.conn_errors[0])
3480
3481 def test_version_basic(self):
3482 """
3483 Basic tests for SSLSocket.version().
3484 More tests are done in the test_protocol_*() methods.
3485 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003486 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3487 context.check_hostname = False
3488 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003489 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003490 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003491 chatty=False) as server:
3492 with context.wrap_socket(socket.socket()) as s:
3493 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003494 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003495 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003496 if ssl.OPENSSL_VERSION_INFO >= (1, 1, 1):
3497 self.assertEqual(s.version(), 'TLSv1.3')
3498 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003499 self.assertEqual(s.version(), 'TLSv1.2')
3500 else: # 0.9.8 to 1.0.1
3501 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003502 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003503 self.assertIs(s.version(), None)
3504
Christian Heimescb5b68a2017-09-07 18:07:00 -07003505 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3506 "test requires TLSv1.3 enabled OpenSSL")
3507 def test_tls1_3(self):
3508 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3509 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003510 context.options |= (
3511 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3512 )
3513 with ThreadedEchoServer(context=context) as server:
3514 with context.wrap_socket(socket.socket()) as s:
3515 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003516 self.assertIn(s.cipher()[0], {
Christian Heimescb5b68a2017-09-07 18:07:00 -07003517 'TLS13-AES-256-GCM-SHA384',
3518 'TLS13-CHACHA20-POLY1305-SHA256',
3519 'TLS13-AES-128-GCM-SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003520 })
3521 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003522
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003523 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3524 "required OpenSSL 1.1.0g")
3525 def test_min_max_version(self):
3526 client_context, server_context, hostname = testing_context()
3527 # client TLSv1.0 to 1.2
3528 client_context.minimum_version = ssl.TLSVersion.TLSv1
3529 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3530 # server only TLSv1.2
3531 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3532 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3533
3534 with ThreadedEchoServer(context=server_context) as server:
3535 with client_context.wrap_socket(socket.socket(),
3536 server_hostname=hostname) as s:
3537 s.connect((HOST, server.port))
3538 self.assertEqual(s.version(), 'TLSv1.2')
3539
3540 # client 1.0 to 1.2, server 1.0 to 1.1
3541 server_context.minimum_version = ssl.TLSVersion.TLSv1
3542 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3543
3544 with ThreadedEchoServer(context=server_context) as server:
3545 with client_context.wrap_socket(socket.socket(),
3546 server_hostname=hostname) as s:
3547 s.connect((HOST, server.port))
3548 self.assertEqual(s.version(), 'TLSv1.1')
3549
3550 # client 1.0, server 1.2 (mismatch)
3551 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3552 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3553 client_context.minimum_version = ssl.TLSVersion.TLSv1
3554 client_context.maximum_version = ssl.TLSVersion.TLSv1
3555 with ThreadedEchoServer(context=server_context) as server:
3556 with client_context.wrap_socket(socket.socket(),
3557 server_hostname=hostname) as s:
3558 with self.assertRaises(ssl.SSLError) as e:
3559 s.connect((HOST, server.port))
3560 self.assertIn("alert", str(e.exception))
3561
3562
3563 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3564 "required OpenSSL 1.1.0g")
3565 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3566 def test_min_max_version_sslv3(self):
3567 client_context, server_context, hostname = testing_context()
3568 server_context.minimum_version = ssl.TLSVersion.SSLv3
3569 client_context.minimum_version = ssl.TLSVersion.SSLv3
3570 client_context.maximum_version = ssl.TLSVersion.SSLv3
3571 with ThreadedEchoServer(context=server_context) as server:
3572 with client_context.wrap_socket(socket.socket(),
3573 server_hostname=hostname) as s:
3574 s.connect((HOST, server.port))
3575 self.assertEqual(s.version(), 'SSLv3')
3576
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003577 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3578 def test_default_ecdh_curve(self):
3579 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3580 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003581 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003582 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003583 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3584 # cipher name.
3585 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003586 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3587 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3588 # our default cipher list should prefer ECDH-based ciphers
3589 # automatically.
3590 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3591 context.set_ciphers("ECCdraft:ECDH")
3592 with ThreadedEchoServer(context=context) as server:
3593 with context.wrap_socket(socket.socket()) as s:
3594 s.connect((HOST, server.port))
3595 self.assertIn("ECDH", s.cipher()[0])
3596
3597 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3598 "'tls-unique' channel binding not available")
3599 def test_tls_unique_channel_binding(self):
3600 """Test tls-unique channel binding."""
3601 if support.verbose:
3602 sys.stdout.write("\n")
3603
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003604 client_context, server_context, hostname = testing_context()
3605 # TODO: fix TLSv1.3 support
3606 client_context.options |= ssl.OP_NO_TLSv1_3
3607
3608 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003609 chatty=True,
3610 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003611
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003612 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003613 with client_context.wrap_socket(
3614 socket.socket(),
3615 server_hostname=hostname) as s:
3616 s.connect((HOST, server.port))
3617 # get the data
3618 cb_data = s.get_channel_binding("tls-unique")
3619 if support.verbose:
3620 sys.stdout.write(
3621 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003622
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003623 # check if it is sane
3624 self.assertIsNotNone(cb_data)
3625 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003626
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003627 # and compare with the peers version
3628 s.write(b"CB tls-unique\n")
3629 peer_data_repr = s.read().strip()
3630 self.assertEqual(peer_data_repr,
3631 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003632
3633 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003634 with client_context.wrap_socket(
3635 socket.socket(),
3636 server_hostname=hostname) as s:
3637 s.connect((HOST, server.port))
3638 new_cb_data = s.get_channel_binding("tls-unique")
3639 if support.verbose:
3640 sys.stdout.write(
3641 "got another channel binding data: {0!r}\n".format(
3642 new_cb_data)
3643 )
3644 # is it really unique
3645 self.assertNotEqual(cb_data, new_cb_data)
3646 self.assertIsNotNone(cb_data)
3647 self.assertEqual(len(cb_data), 12) # True for TLSv1
3648 s.write(b"CB tls-unique\n")
3649 peer_data_repr = s.read().strip()
3650 self.assertEqual(peer_data_repr,
3651 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003652
3653 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003654 client_context, server_context, hostname = testing_context()
3655 stats = server_params_test(client_context, server_context,
3656 chatty=True, connectionchatty=True,
3657 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003658 if support.verbose:
3659 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3660 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3661
3662 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3663 "ssl.OP_NO_COMPRESSION needed for this test")
3664 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003665 client_context, server_context, hostname = testing_context()
3666 client_context.options |= ssl.OP_NO_COMPRESSION
3667 server_context.options |= ssl.OP_NO_COMPRESSION
3668 stats = server_params_test(client_context, server_context,
3669 chatty=True, connectionchatty=True,
3670 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003671 self.assertIs(stats['compression'], None)
3672
3673 def test_dh_params(self):
3674 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003675 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003676 # test scenario needs TLS <= 1.2
3677 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003678 server_context.load_dh_params(DHFILE)
3679 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003680 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003681 stats = server_params_test(client_context, server_context,
3682 chatty=True, connectionchatty=True,
3683 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003684 cipher = stats["cipher"][0]
3685 parts = cipher.split("-")
3686 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3687 self.fail("Non-DH cipher: " + cipher[0])
3688
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003689 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003690 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003691 def test_ecdh_curve(self):
3692 # server secp384r1, client auto
3693 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003694
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003695 server_context.set_ecdh_curve("secp384r1")
3696 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3697 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3698 stats = server_params_test(client_context, server_context,
3699 chatty=True, connectionchatty=True,
3700 sni_name=hostname)
3701
3702 # server auto, client secp384r1
3703 client_context, server_context, hostname = testing_context()
3704 client_context.set_ecdh_curve("secp384r1")
3705 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3706 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3707 stats = server_params_test(client_context, server_context,
3708 chatty=True, connectionchatty=True,
3709 sni_name=hostname)
3710
3711 # server / client curve mismatch
3712 client_context, server_context, hostname = testing_context()
3713 client_context.set_ecdh_curve("prime256v1")
3714 server_context.set_ecdh_curve("secp384r1")
3715 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3716 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3717 try:
3718 stats = server_params_test(client_context, server_context,
3719 chatty=True, connectionchatty=True,
3720 sni_name=hostname)
3721 except ssl.SSLError:
3722 pass
3723 else:
3724 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003725 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003726 self.fail("mismatch curve did not fail")
3727
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003728 def test_selected_alpn_protocol(self):
3729 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003730 client_context, server_context, hostname = testing_context()
3731 stats = server_params_test(client_context, server_context,
3732 chatty=True, connectionchatty=True,
3733 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003734 self.assertIs(stats['client_alpn_protocol'], None)
3735
3736 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3737 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3738 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003739 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003740 server_context.set_alpn_protocols(['foo', 'bar'])
3741 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003742 chatty=True, connectionchatty=True,
3743 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003744 self.assertIs(stats['client_alpn_protocol'], None)
3745
3746 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3747 def test_alpn_protocols(self):
3748 server_protocols = ['foo', 'bar', 'milkshake']
3749 protocol_tests = [
3750 (['foo', 'bar'], 'foo'),
3751 (['bar', 'foo'], 'foo'),
3752 (['milkshake'], 'milkshake'),
3753 (['http/3.0', 'http/4.0'], None)
3754 ]
3755 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003756 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003757 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003758 client_context.set_alpn_protocols(client_protocols)
3759
3760 try:
3761 stats = server_params_test(client_context,
3762 server_context,
3763 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003764 connectionchatty=True,
3765 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003766 except ssl.SSLError as e:
3767 stats = e
3768
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003769 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003770 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3771 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3772 self.assertIsInstance(stats, ssl.SSLError)
3773 else:
3774 msg = "failed trying %s (s) and %s (c).\n" \
3775 "was expecting %s, but got %%s from the %%s" \
3776 % (str(server_protocols), str(client_protocols),
3777 str(expected))
3778 client_result = stats['client_alpn_protocol']
3779 self.assertEqual(client_result, expected,
3780 msg % (client_result, "client"))
3781 server_result = stats['server_alpn_protocols'][-1] \
3782 if len(stats['server_alpn_protocols']) else 'nothing'
3783 self.assertEqual(server_result, expected,
3784 msg % (server_result, "server"))
3785
3786 def test_selected_npn_protocol(self):
3787 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003788 client_context, server_context, hostname = testing_context()
3789 stats = server_params_test(client_context, server_context,
3790 chatty=True, connectionchatty=True,
3791 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003792 self.assertIs(stats['client_npn_protocol'], None)
3793
3794 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3795 def test_npn_protocols(self):
3796 server_protocols = ['http/1.1', 'spdy/2']
3797 protocol_tests = [
3798 (['http/1.1', 'spdy/2'], 'http/1.1'),
3799 (['spdy/2', 'http/1.1'], 'http/1.1'),
3800 (['spdy/2', 'test'], 'spdy/2'),
3801 (['abc', 'def'], 'abc')
3802 ]
3803 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003804 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003805 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003806 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003807 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003808 chatty=True, connectionchatty=True,
3809 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003810 msg = "failed trying %s (s) and %s (c).\n" \
3811 "was expecting %s, but got %%s from the %%s" \
3812 % (str(server_protocols), str(client_protocols),
3813 str(expected))
3814 client_result = stats['client_npn_protocol']
3815 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3816 server_result = stats['server_npn_protocols'][-1] \
3817 if len(stats['server_npn_protocols']) else 'nothing'
3818 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3819
3820 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003821 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003822 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003823 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003824 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003825 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003826 client_context.load_verify_locations(SIGNING_CA)
3827 return server_context, other_context, client_context
3828
3829 def check_common_name(self, stats, name):
3830 cert = stats['peercert']
3831 self.assertIn((('commonName', name),), cert['subject'])
3832
3833 @needs_sni
3834 def test_sni_callback(self):
3835 calls = []
3836 server_context, other_context, client_context = self.sni_contexts()
3837
Christian Heimesa170fa12017-09-15 20:27:30 +02003838 client_context.check_hostname = False
3839
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003840 def servername_cb(ssl_sock, server_name, initial_context):
3841 calls.append((server_name, initial_context))
3842 if server_name is not None:
3843 ssl_sock.context = other_context
3844 server_context.set_servername_callback(servername_cb)
3845
3846 stats = server_params_test(client_context, server_context,
3847 chatty=True,
3848 sni_name='supermessage')
3849 # The hostname was fetched properly, and the certificate was
3850 # changed for the connection.
3851 self.assertEqual(calls, [("supermessage", server_context)])
3852 # CERTFILE4 was selected
3853 self.check_common_name(stats, 'fakehostname')
3854
3855 calls = []
3856 # The callback is called with server_name=None
3857 stats = server_params_test(client_context, server_context,
3858 chatty=True,
3859 sni_name=None)
3860 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003861 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003862
3863 # Check disabling the callback
3864 calls = []
3865 server_context.set_servername_callback(None)
3866
3867 stats = server_params_test(client_context, server_context,
3868 chatty=True,
3869 sni_name='notfunny')
3870 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003871 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003872 self.assertEqual(calls, [])
3873
3874 @needs_sni
3875 def test_sni_callback_alert(self):
3876 # Returning a TLS alert is reflected to the connecting client
3877 server_context, other_context, client_context = self.sni_contexts()
3878
3879 def cb_returning_alert(ssl_sock, server_name, initial_context):
3880 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3881 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003882 with self.assertRaises(ssl.SSLError) as cm:
3883 stats = server_params_test(client_context, server_context,
3884 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003885 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003886 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003887
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003888 @needs_sni
3889 def test_sni_callback_raising(self):
3890 # Raising fails the connection with a TLS handshake failure alert.
3891 server_context, other_context, client_context = self.sni_contexts()
3892
3893 def cb_raising(ssl_sock, server_name, initial_context):
3894 1/0
3895 server_context.set_servername_callback(cb_raising)
3896
3897 with self.assertRaises(ssl.SSLError) as cm, \
3898 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003899 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003900 chatty=False,
3901 sni_name='supermessage')
3902 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3903 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003904
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003905 @needs_sni
3906 def test_sni_callback_wrong_return_type(self):
3907 # Returning the wrong return type terminates the TLS connection
3908 # with an internal error alert.
3909 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003910
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003911 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3912 return "foo"
3913 server_context.set_servername_callback(cb_wrong_return_type)
3914
3915 with self.assertRaises(ssl.SSLError) as cm, \
3916 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003917 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003918 chatty=False,
3919 sni_name='supermessage')
3920 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3921 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003922
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003923 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003924 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003925 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3926 client_context.set_ciphers("AES128:AES256")
3927 server_context.set_ciphers("AES256")
3928 alg1 = "AES256"
3929 alg2 = "AES-256"
3930 else:
3931 client_context.set_ciphers("AES:3DES")
3932 server_context.set_ciphers("3DES")
3933 alg1 = "3DES"
3934 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003935
Christian Heimesa170fa12017-09-15 20:27:30 +02003936 stats = server_params_test(client_context, server_context,
3937 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003938 ciphers = stats['server_shared_ciphers'][0]
3939 self.assertGreater(len(ciphers), 0)
3940 for name, tls_version, bits in ciphers:
3941 if not alg1 in name.split("-") and alg2 not in name:
3942 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003943
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003944 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003945 client_context, server_context, hostname = testing_context()
3946 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003947
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003948 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003949 s = client_context.wrap_socket(socket.socket(),
3950 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003951 s.connect((HOST, server.port))
3952 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003953
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003954 self.assertRaises(ValueError, s.read, 1024)
3955 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003956
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003957 def test_sendfile(self):
3958 TEST_DATA = b"x" * 512
3959 with open(support.TESTFN, 'wb') as f:
3960 f.write(TEST_DATA)
3961 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003962 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003963 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003964 context.load_verify_locations(SIGNING_CA)
3965 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003966 server = ThreadedEchoServer(context=context, chatty=False)
3967 with server:
3968 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003969 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003970 with open(support.TESTFN, 'rb') as file:
3971 s.sendfile(file)
3972 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003973
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003974 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003975 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003976 # TODO: sessions aren't compatible with TLSv1.3 yet
3977 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003978
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003979 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003980 stats = server_params_test(client_context, server_context,
3981 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003982 session = stats['session']
3983 self.assertTrue(session.id)
3984 self.assertGreater(session.time, 0)
3985 self.assertGreater(session.timeout, 0)
3986 self.assertTrue(session.has_ticket)
3987 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3988 self.assertGreater(session.ticket_lifetime_hint, 0)
3989 self.assertFalse(stats['session_reused'])
3990 sess_stat = server_context.session_stats()
3991 self.assertEqual(sess_stat['accept'], 1)
3992 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003993
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003994 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003995 stats = server_params_test(client_context, server_context,
3996 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003997 sess_stat = server_context.session_stats()
3998 self.assertEqual(sess_stat['accept'], 2)
3999 self.assertEqual(sess_stat['hits'], 1)
4000 self.assertTrue(stats['session_reused'])
4001 session2 = stats['session']
4002 self.assertEqual(session2.id, session.id)
4003 self.assertEqual(session2, session)
4004 self.assertIsNot(session2, session)
4005 self.assertGreaterEqual(session2.time, session.time)
4006 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004007
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004008 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004009 stats = server_params_test(client_context, server_context,
4010 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004011 self.assertFalse(stats['session_reused'])
4012 session3 = stats['session']
4013 self.assertNotEqual(session3.id, session.id)
4014 self.assertNotEqual(session3, session)
4015 sess_stat = server_context.session_stats()
4016 self.assertEqual(sess_stat['accept'], 3)
4017 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004018
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004019 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004020 stats = server_params_test(client_context, server_context,
4021 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 self.assertTrue(stats['session_reused'])
4023 session4 = stats['session']
4024 self.assertEqual(session4.id, session.id)
4025 self.assertEqual(session4, session)
4026 self.assertGreaterEqual(session4.time, session.time)
4027 self.assertGreaterEqual(session4.timeout, session.timeout)
4028 sess_stat = server_context.session_stats()
4029 self.assertEqual(sess_stat['accept'], 4)
4030 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004031
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004032 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004033 client_context, server_context, hostname = testing_context()
4034 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004035
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004036 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004037 client_context.options |= ssl.OP_NO_TLSv1_3
4038 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004039
Christian Heimesa170fa12017-09-15 20:27:30 +02004040 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004041 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004042 with client_context.wrap_socket(socket.socket(),
4043 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004044 # session is None before handshake
4045 self.assertEqual(s.session, None)
4046 self.assertEqual(s.session_reused, None)
4047 s.connect((HOST, server.port))
4048 session = s.session
4049 self.assertTrue(session)
4050 with self.assertRaises(TypeError) as e:
4051 s.session = object
4052 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004053
Christian Heimesa170fa12017-09-15 20:27:30 +02004054 with client_context.wrap_socket(socket.socket(),
4055 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004056 s.connect((HOST, server.port))
4057 # cannot set session after handshake
4058 with self.assertRaises(ValueError) as e:
4059 s.session = session
4060 self.assertEqual(str(e.exception),
4061 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004062
Christian Heimesa170fa12017-09-15 20:27:30 +02004063 with client_context.wrap_socket(socket.socket(),
4064 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004065 # can set session before handshake and before the
4066 # connection was established
4067 s.session = session
4068 s.connect((HOST, server.port))
4069 self.assertEqual(s.session.id, session.id)
4070 self.assertEqual(s.session, session)
4071 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004072
Christian Heimesa170fa12017-09-15 20:27:30 +02004073 with client_context2.wrap_socket(socket.socket(),
4074 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004075 # cannot re-use session with a different SSLContext
4076 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004077 s.session = session
4078 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004079 self.assertEqual(str(e.exception),
4080 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004081
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004082
Thomas Woutersed03b412007-08-28 21:37:11 +00004083def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004084 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004085 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004086 plats = {
4087 'Linux': platform.linux_distribution,
4088 'Mac': platform.mac_ver,
4089 'Windows': platform.win32_ver,
4090 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004091 with warnings.catch_warnings():
4092 warnings.filterwarnings(
4093 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004094 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004095 'functions are deprecated .*',
4096 PendingDeprecationWarning,
4097 )
4098 for name, func in plats.items():
4099 plat = func()
4100 if plat and plat[0]:
4101 plat = '%s %r' % (name, plat)
4102 break
4103 else:
4104 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004105 print("test_ssl: testing with %r %r" %
4106 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4107 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004108 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004109 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4110 try:
4111 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4112 except AttributeError:
4113 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004114
Antoine Pitrou152efa22010-05-16 18:19:27 +00004115 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004116 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004117 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004118 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004119 BADCERT, BADKEY, EMPTYCERT]:
4120 if not os.path.exists(filename):
4121 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004122
Martin Panter3840b2a2016-03-27 01:53:46 +00004123 tests = [
4124 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004125 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004126 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004127
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004128 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004129 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004130
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004132 try:
4133 support.run_unittest(*tests)
4134 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004135 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004136
4137if __name__ == "__main__":
4138 test_main()