blob: 36580d55b9e22d14260d79febf847046755510b1 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080058WRONG_CERT = data_file("wrongcert.pem")
Christian Heimesefff7062013-11-21 03:35:02 +010059
Christian Heimesbd5c7d22018-01-20 15:16:30 +010060CERTFILE_INFO = {
61 'issuer': ((('countryName', 'XY'),),
62 (('localityName', 'Castle Anthrax'),),
63 (('organizationName', 'Python Software Foundation'),),
64 (('commonName', 'localhost'),)),
65 'notAfter': 'Jan 17 19:09:06 2028 GMT',
66 'notBefore': 'Jan 19 19:09:06 2018 GMT',
67 'serialNumber': 'F9BA076D5B6ABD9B',
68 'subject': ((('countryName', 'XY'),),
69 (('localityName', 'Castle Anthrax'),),
70 (('organizationName', 'Python Software Foundation'),),
71 (('commonName', 'localhost'),)),
72 'subjectAltName': (('DNS', 'localhost'),),
73 'version': 3
74}
Antoine Pitrou152efa22010-05-16 18:19:27 +000075
Christian Heimes22587792013-11-21 23:56:13 +010076# empty CRL
77CRLFILE = data_file("revocation.crl")
78
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010079# Two keys and certs signed by the same CA (for SNI tests)
80SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020081SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082
83SIGNED_CERTFILE_INFO = {
84 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
85 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
86 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
87 'issuer': ((('countryName', 'XY'),),
88 (('organizationName', 'Python Software Foundation CA'),),
89 (('commonName', 'our-ca-server'),)),
90 'notAfter': 'Nov 28 19:09:06 2027 GMT',
91 'notBefore': 'Jan 19 19:09:06 2018 GMT',
92 'serialNumber': '82EDBF41C880919C',
93 'subject': ((('countryName', 'XY'),),
94 (('localityName', 'Castle Anthrax'),),
95 (('organizationName', 'Python Software Foundation'),),
96 (('commonName', 'localhost'),)),
97 'subjectAltName': (('DNS', 'localhost'),),
98 'version': 3
99}
100
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100101SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200102SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100103SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
104SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
105
Martin Panter3840b2a2016-03-27 01:53:46 +0000106# Same certificate as pycacert.pem, but without extra text in file
107SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200108# cert with all kinds of subject alt names
109ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100110IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100111
Martin Panter3d81d932016-01-14 09:36:00 +0000112REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000113
114EMPTYCERT = data_file("nullcert.pem")
115BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000116NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000117BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200118NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200119NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400121DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
185def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
187 @functools.wraps(func)
188 def f(*args, **kwargs):
189 try:
190 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
191 except ssl.SSLError:
192 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
193 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
194 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
195 return func(*args, **kwargs)
196 return f
197 else:
198 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100200needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
201
Antoine Pitrou23df4832010-08-04 17:14:06 +0000202
Christian Heimesd0486372016-09-10 23:23:33 +0200203def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
204 cert_reqs=ssl.CERT_NONE, ca_certs=None,
205 ciphers=None, certfile=None, keyfile=None,
206 **kwargs):
207 context = ssl.SSLContext(ssl_version)
208 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200209 if cert_reqs == ssl.CERT_NONE:
210 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200211 context.verify_mode = cert_reqs
212 if ca_certs is not None:
213 context.load_verify_locations(ca_certs)
214 if certfile is not None or keyfile is not None:
215 context.load_cert_chain(certfile, keyfile)
216 if ciphers is not None:
217 context.set_ciphers(ciphers)
218 return context.wrap_socket(sock, **kwargs)
219
Christian Heimesa170fa12017-09-15 20:27:30 +0200220
221def testing_context(server_cert=SIGNED_CERTFILE):
222 """Create context
223
224 client_context, server_context, hostname = testing_context()
225 """
226 if server_cert == SIGNED_CERTFILE:
227 hostname = SIGNED_CERTFILE_HOSTNAME
228 elif server_cert == SIGNED_CERTFILE2:
229 hostname = SIGNED_CERTFILE2_HOSTNAME
230 else:
231 raise ValueError(server_cert)
232
233 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
234 client_context.load_verify_locations(SIGNING_CA)
235
236 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
237 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800238 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200239
240 return client_context, server_context, hostname
241
242
Antoine Pitrou152efa22010-05-16 18:19:27 +0000243class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000244
Antoine Pitrou480a1242010-04-28 21:37:09 +0000245 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000246 ssl.CERT_NONE
247 ssl.CERT_OPTIONAL
248 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100249 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100250 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100251 if ssl.HAS_ECDH:
252 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100253 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
254 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000255 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100256 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700257 ssl.OP_NO_SSLv2
258 ssl.OP_NO_SSLv3
259 ssl.OP_NO_TLSv1
260 ssl.OP_NO_TLSv1_3
261 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
262 ssl.OP_NO_TLSv1_1
263 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200264 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000265
Christian Heimes89c20512018-02-27 11:17:32 +0100266 def test_private_init(self):
267 with self.assertRaisesRegex(TypeError, "public constructor"):
268 with socket.socket() as s:
269 ssl.SSLSocket(s)
270
Antoine Pitrou172f0252014-04-18 20:33:08 +0200271 def test_str_for_enums(self):
272 # Make sure that the PROTOCOL_* constants have enum-like string
273 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200274 proto = ssl.PROTOCOL_TLS
275 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200276 ctx = ssl.SSLContext(proto)
277 self.assertIs(ctx.protocol, proto)
278
Antoine Pitrou480a1242010-04-28 21:37:09 +0000279 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000282 sys.stdout.write("\n RAND_status is %d (%s)\n"
283 % (v, (v and "sufficient randomness") or
284 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200285
286 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
287 self.assertEqual(len(data), 16)
288 self.assertEqual(is_cryptographic, v == 1)
289 if v:
290 data = ssl.RAND_bytes(16)
291 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200292 else:
293 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200294
Victor Stinner1e81a392013-12-19 16:47:04 +0100295 # negative num is invalid
296 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
297 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
298
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100299 if hasattr(ssl, 'RAND_egd'):
300 self.assertRaises(TypeError, ssl.RAND_egd, 1)
301 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000302 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200303 ssl.RAND_add(b"this is a random bytes object", 75.0)
304 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000305
Christian Heimesf77b4b22013-08-21 13:26:05 +0200306 @unittest.skipUnless(os.name == 'posix', 'requires posix')
307 def test_random_fork(self):
308 status = ssl.RAND_status()
309 if not status:
310 self.fail("OpenSSL's PRNG has insufficient randomness")
311
312 rfd, wfd = os.pipe()
313 pid = os.fork()
314 if pid == 0:
315 try:
316 os.close(rfd)
317 child_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(child_random), 16)
319 os.write(wfd, child_random)
320 os.close(wfd)
321 except BaseException:
322 os._exit(1)
323 else:
324 os._exit(0)
325 else:
326 os.close(wfd)
327 self.addCleanup(os.close, rfd)
328 _, status = os.waitpid(pid, 0)
329 self.assertEqual(status, 0)
330
331 child_random = os.read(rfd, 16)
332 self.assertEqual(len(child_random), 16)
333 parent_random = ssl.RAND_pseudo_bytes(16)[0]
334 self.assertEqual(len(parent_random), 16)
335
336 self.assertNotEqual(child_random, parent_random)
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimes824f7f32013-08-17 00:54:47 +0200367 def test_parse_cert_CVE_2013_4238(self):
368 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 subject = ((('countryName', 'US'),),
372 (('stateOrProvinceName', 'Oregon'),),
373 (('localityName', 'Beaverton'),),
374 (('organizationName', 'Python Software Foundation'),),
375 (('organizationalUnitName', 'Python Core Development'),),
376 (('commonName', 'null.python.org\x00example.org'),),
377 (('emailAddress', 'python-dev@python.org'),))
378 self.assertEqual(p['subject'], subject)
379 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200380 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
381 san = (('DNS', 'altnull.python.org\x00example.com'),
382 ('email', 'null@python.org\x00user@example.org'),
383 ('URI', 'http://null.python.org\x00http://example.org'),
384 ('IP Address', '192.0.2.1'),
385 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
386 else:
387 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
388 san = (('DNS', 'altnull.python.org\x00example.com'),
389 ('email', 'null@python.org\x00user@example.org'),
390 ('URI', 'http://null.python.org\x00http://example.org'),
391 ('IP Address', '192.0.2.1'),
392 ('IP Address', '<invalid>'))
393
394 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200395
Christian Heimes1c03abd2016-09-06 23:25:35 +0200396 def test_parse_all_sans(self):
397 p = ssl._ssl._test_decode_cert(ALLSANFILE)
398 self.assertEqual(p['subjectAltName'],
399 (
400 ('DNS', 'allsans'),
401 ('othername', '<unsupported>'),
402 ('othername', '<unsupported>'),
403 ('email', 'user@example.org'),
404 ('DNS', 'www.example.org'),
405 ('DirName',
406 ((('countryName', 'XY'),),
407 (('localityName', 'Castle Anthrax'),),
408 (('organizationName', 'Python Software Foundation'),),
409 (('commonName', 'dirname example'),))),
410 ('URI', 'https://www.python.org/'),
411 ('IP Address', '127.0.0.1'),
412 ('IP Address', '0:0:0:0:0:0:0:1\n'),
413 ('Registered ID', '1.2.3.4.5')
414 )
415 )
416
Antoine Pitrou480a1242010-04-28 21:37:09 +0000417 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000418 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000419 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000420 d1 = ssl.PEM_cert_to_DER_cert(pem)
421 p2 = ssl.DER_cert_to_PEM_cert(d1)
422 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000423 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000424 if not p2.startswith(ssl.PEM_HEADER + '\n'):
425 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
426 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
427 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000428
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000429 def test_openssl_version(self):
430 n = ssl.OPENSSL_VERSION_NUMBER
431 t = ssl.OPENSSL_VERSION_INFO
432 s = ssl.OPENSSL_VERSION
433 self.assertIsInstance(n, int)
434 self.assertIsInstance(t, tuple)
435 self.assertIsInstance(s, str)
436 # Some sanity checks follow
437 # >= 0.9
438 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400439 # < 3.0
440 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000441 major, minor, fix, patch, status = t
442 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400443 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000444 self.assertGreaterEqual(minor, 0)
445 self.assertLess(minor, 256)
446 self.assertGreaterEqual(fix, 0)
447 self.assertLess(fix, 256)
448 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100449 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 self.assertGreaterEqual(status, 0)
451 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400452 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200453 if IS_LIBRESSL:
454 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100455 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400456 else:
457 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100458 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000459
Antoine Pitrou9d543662010-04-23 23:10:32 +0000460 @support.cpython_only
461 def test_refcycle(self):
462 # Issue #7943: an SSL object doesn't create reference cycles with
463 # itself.
464 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200465 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000466 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100467 with support.check_warnings(("", ResourceWarning)):
468 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100469 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000470
Antoine Pitroua468adc2010-09-14 14:43:44 +0000471 def test_wrapped_unconnected(self):
472 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200473 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000474 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200475 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100476 self.assertRaises(OSError, ss.recv, 1)
477 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
478 self.assertRaises(OSError, ss.recvfrom, 1)
479 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
480 self.assertRaises(OSError, ss.send, b'x')
481 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800482 self.assertRaises(NotImplementedError, ss.sendmsg,
483 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000484
Antoine Pitrou40f08742010-04-24 22:04:40 +0000485 def test_timeout(self):
486 # Issue #8524: when creating an SSL socket, the timeout of the
487 # original socket should be retained.
488 for timeout in (None, 0.0, 5.0):
489 s = socket.socket(socket.AF_INET)
490 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200491 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100492 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000493
Christian Heimesd0486372016-09-10 23:23:33 +0200494 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000495 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000496 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000497 "certfile must be specified",
498 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000499 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000500 "certfile must be specified for server-side operations",
501 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000502 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000503 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200504 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100505 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
506 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200507 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200508 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000509 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000510 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000511 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200512 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000513 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000514 ssl.wrap_socket(sock,
515 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000516 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200517 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000518 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000519 ssl.wrap_socket(sock,
520 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000521 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000522
Martin Panter3464ea22016-02-01 21:58:11 +0000523 def bad_cert_test(self, certfile):
524 """Check that trying to use the given client certificate fails"""
525 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
526 certfile)
527 sock = socket.socket()
528 self.addCleanup(sock.close)
529 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200530 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200531 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000532
533 def test_empty_cert(self):
534 """Wrapping with an empty cert file"""
535 self.bad_cert_test("nullcert.pem")
536
537 def test_malformed_cert(self):
538 """Wrapping with a badly formatted certificate (syntax error)"""
539 self.bad_cert_test("badcert.pem")
540
541 def test_malformed_key(self):
542 """Wrapping with a badly formatted key (syntax error)"""
543 self.bad_cert_test("badkey.pem")
544
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 def test_match_hostname(self):
546 def ok(cert, hostname):
547 ssl.match_hostname(cert, hostname)
548 def fail(cert, hostname):
549 self.assertRaises(ssl.CertificateError,
550 ssl.match_hostname, cert, hostname)
551
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100552 # -- Hostname matching --
553
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000554 cert = {'subject': ((('commonName', 'example.com'),),)}
555 ok(cert, 'example.com')
556 ok(cert, 'ExAmple.cOm')
557 fail(cert, 'www.example.com')
558 fail(cert, '.example.com')
559 fail(cert, 'example.org')
560 fail(cert, 'exampleXcom')
561
562 cert = {'subject': ((('commonName', '*.a.com'),),)}
563 ok(cert, 'foo.a.com')
564 fail(cert, 'bar.foo.a.com')
565 fail(cert, 'a.com')
566 fail(cert, 'Xa.com')
567 fail(cert, '.a.com')
568
Mandeep Singhede2ac92017-11-27 04:01:27 +0530569 # only match wildcards when they are the only thing
570 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000571 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530572 fail(cert, 'foo.com')
573 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000574 fail(cert, 'bar.com')
575 fail(cert, 'foo.a.com')
576 fail(cert, 'bar.foo.com')
577
Christian Heimes824f7f32013-08-17 00:54:47 +0200578 # NULL bytes are bad, CVE-2013-4073
579 cert = {'subject': ((('commonName',
580 'null.python.org\x00example.org'),),)}
581 ok(cert, 'null.python.org\x00example.org') # or raise an error?
582 fail(cert, 'example.org')
583 fail(cert, 'null.python.org')
584
Georg Brandl72c98d32013-10-27 07:16:53 +0100585 # error cases with wildcards
586 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
587 fail(cert, 'bar.foo.a.com')
588 fail(cert, 'a.com')
589 fail(cert, 'Xa.com')
590 fail(cert, '.a.com')
591
592 cert = {'subject': ((('commonName', 'a.*.com'),),)}
593 fail(cert, 'a.foo.com')
594 fail(cert, 'a..com')
595 fail(cert, 'a.com')
596
597 # wildcard doesn't match IDNA prefix 'xn--'
598 idna = 'püthon.python.org'.encode("idna").decode("ascii")
599 cert = {'subject': ((('commonName', idna),),)}
600 ok(cert, idna)
601 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
602 fail(cert, idna)
603 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
604 fail(cert, idna)
605
606 # wildcard in first fragment and IDNA A-labels in sequent fragments
607 # are supported.
608 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
609 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530610 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
611 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100612 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
613 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
614
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000615 # Slightly fake real-world example
616 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
617 'subject': ((('commonName', 'linuxfrz.org'),),),
618 'subjectAltName': (('DNS', 'linuxfr.org'),
619 ('DNS', 'linuxfr.com'),
620 ('othername', '<unsupported>'))}
621 ok(cert, 'linuxfr.org')
622 ok(cert, 'linuxfr.com')
623 # Not a "DNS" entry
624 fail(cert, '<unsupported>')
625 # When there is a subjectAltName, commonName isn't used
626 fail(cert, 'linuxfrz.org')
627
628 # A pristine real-world example
629 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
630 'subject': ((('countryName', 'US'),),
631 (('stateOrProvinceName', 'California'),),
632 (('localityName', 'Mountain View'),),
633 (('organizationName', 'Google Inc'),),
634 (('commonName', 'mail.google.com'),))}
635 ok(cert, 'mail.google.com')
636 fail(cert, 'gmail.com')
637 # Only commonName is considered
638 fail(cert, 'California')
639
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100640 # -- IPv4 matching --
641 cert = {'subject': ((('commonName', 'example.com'),),),
642 'subjectAltName': (('DNS', 'example.com'),
643 ('IP Address', '10.11.12.13'),
644 ('IP Address', '14.15.16.17'))}
645 ok(cert, '10.11.12.13')
646 ok(cert, '14.15.16.17')
647 fail(cert, '14.15.16.18')
648 fail(cert, 'example.net')
649
650 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800651 if hasattr(socket, 'AF_INET6'):
652 cert = {'subject': ((('commonName', 'example.com'),),),
653 'subjectAltName': (
654 ('DNS', 'example.com'),
655 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
656 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
657 ok(cert, '2001::cafe')
658 ok(cert, '2003::baba')
659 fail(cert, '2003::bebe')
660 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100661
662 # -- Miscellaneous --
663
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000664 # Neither commonName nor subjectAltName
665 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
666 'subject': ((('countryName', 'US'),),
667 (('stateOrProvinceName', 'California'),),
668 (('localityName', 'Mountain View'),),
669 (('organizationName', 'Google Inc'),))}
670 fail(cert, 'mail.google.com')
671
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200672 # No DNS entry in subjectAltName but a commonName
673 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
674 'subject': ((('countryName', 'US'),),
675 (('stateOrProvinceName', 'California'),),
676 (('localityName', 'Mountain View'),),
677 (('commonName', 'mail.google.com'),)),
678 'subjectAltName': (('othername', 'blabla'), )}
679 ok(cert, 'mail.google.com')
680
681 # No DNS entry subjectAltName and no commonName
682 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
683 'subject': ((('countryName', 'US'),),
684 (('stateOrProvinceName', 'California'),),
685 (('localityName', 'Mountain View'),),
686 (('organizationName', 'Google Inc'),)),
687 'subjectAltName': (('othername', 'blabla'),)}
688 fail(cert, 'google.com')
689
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000690 # Empty cert / no cert
691 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
692 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
693
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200694 # Issue #17980: avoid denials of service by refusing more than one
695 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800696 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
697 with self.assertRaisesRegex(
698 ssl.CertificateError,
699 "partial wildcards in leftmost label are not supported"):
700 ssl.match_hostname(cert, 'axxb.example.com')
701
702 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
703 with self.assertRaisesRegex(
704 ssl.CertificateError,
705 "wildcard can only be present in the leftmost label"):
706 ssl.match_hostname(cert, 'www.sub.example.com')
707
708 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
709 with self.assertRaisesRegex(
710 ssl.CertificateError,
711 "too many wildcards"):
712 ssl.match_hostname(cert, 'axxbxxc.example.com')
713
714 cert = {'subject': ((('commonName', '*'),),)}
715 with self.assertRaisesRegex(
716 ssl.CertificateError,
717 "sole wildcard without additional labels are not support"):
718 ssl.match_hostname(cert, 'host')
719
720 cert = {'subject': ((('commonName', '*.com'),),)}
721 with self.assertRaisesRegex(
722 ssl.CertificateError,
723 r"hostname 'com' doesn't match '\*.com'"):
724 ssl.match_hostname(cert, 'com')
725
726 # extra checks for _inet_paton()
727 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
728 with self.assertRaises(ValueError):
729 ssl._inet_paton(invalid)
730 for ipaddr in ['127.0.0.1', '192.168.0.1']:
731 self.assertTrue(ssl._inet_paton(ipaddr))
732 if hasattr(socket, 'AF_INET6'):
733 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
734 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200735
Antoine Pitroud5323212010-10-22 18:19:07 +0000736 def test_server_side(self):
737 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200738 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000739 with socket.socket() as sock:
740 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
741 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000742
Antoine Pitroud6494802011-07-21 01:11:30 +0200743 def test_unknown_channel_binding(self):
744 # should raise ValueError for unknown type
745 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200746 s.bind(('127.0.0.1', 0))
747 s.listen()
748 c = socket.socket(socket.AF_INET)
749 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200750 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100751 with self.assertRaises(ValueError):
752 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200753 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200754
755 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
756 "'tls-unique' channel binding not available")
757 def test_tls_unique_channel_binding(self):
758 # unconnected should return None for known type
759 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200760 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100761 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200762 # the same for server-side
763 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200764 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100765 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200766
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600767 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200768 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600769 r = repr(ss)
770 with self.assertWarns(ResourceWarning) as cm:
771 ss = None
772 support.gc_collect()
773 self.assertIn(r, str(cm.warning.args[0]))
774
Christian Heimes6d7ad132013-06-09 18:02:55 +0200775 def test_get_default_verify_paths(self):
776 paths = ssl.get_default_verify_paths()
777 self.assertEqual(len(paths), 6)
778 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
779
780 with support.EnvironmentVarGuard() as env:
781 env["SSL_CERT_DIR"] = CAPATH
782 env["SSL_CERT_FILE"] = CERTFILE
783 paths = ssl.get_default_verify_paths()
784 self.assertEqual(paths.cafile, CERTFILE)
785 self.assertEqual(paths.capath, CAPATH)
786
Christian Heimes44109d72013-11-22 01:51:30 +0100787 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
788 def test_enum_certificates(self):
789 self.assertTrue(ssl.enum_certificates("CA"))
790 self.assertTrue(ssl.enum_certificates("ROOT"))
791
792 self.assertRaises(TypeError, ssl.enum_certificates)
793 self.assertRaises(WindowsError, ssl.enum_certificates, "")
794
Christian Heimesc2d65e12013-11-22 16:13:55 +0100795 trust_oids = set()
796 for storename in ("CA", "ROOT"):
797 store = ssl.enum_certificates(storename)
798 self.assertIsInstance(store, list)
799 for element in store:
800 self.assertIsInstance(element, tuple)
801 self.assertEqual(len(element), 3)
802 cert, enc, trust = element
803 self.assertIsInstance(cert, bytes)
804 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
805 self.assertIsInstance(trust, (set, bool))
806 if isinstance(trust, set):
807 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100808
809 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100810 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200811
Christian Heimes46bebee2013-06-09 19:03:31 +0200812 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100813 def test_enum_crls(self):
814 self.assertTrue(ssl.enum_crls("CA"))
815 self.assertRaises(TypeError, ssl.enum_crls)
816 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200817
Christian Heimes44109d72013-11-22 01:51:30 +0100818 crls = ssl.enum_crls("CA")
819 self.assertIsInstance(crls, list)
820 for element in crls:
821 self.assertIsInstance(element, tuple)
822 self.assertEqual(len(element), 2)
823 self.assertIsInstance(element[0], bytes)
824 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200825
Christian Heimes46bebee2013-06-09 19:03:31 +0200826
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100827 def test_asn1object(self):
828 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
829 '1.3.6.1.5.5.7.3.1')
830
831 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
832 self.assertEqual(val, expected)
833 self.assertEqual(val.nid, 129)
834 self.assertEqual(val.shortname, 'serverAuth')
835 self.assertEqual(val.longname, 'TLS Web Server Authentication')
836 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
837 self.assertIsInstance(val, ssl._ASN1Object)
838 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
839
840 val = ssl._ASN1Object.fromnid(129)
841 self.assertEqual(val, expected)
842 self.assertIsInstance(val, ssl._ASN1Object)
843 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100844 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
845 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100846 for i in range(1000):
847 try:
848 obj = ssl._ASN1Object.fromnid(i)
849 except ValueError:
850 pass
851 else:
852 self.assertIsInstance(obj.nid, int)
853 self.assertIsInstance(obj.shortname, str)
854 self.assertIsInstance(obj.longname, str)
855 self.assertIsInstance(obj.oid, (str, type(None)))
856
857 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
858 self.assertEqual(val, expected)
859 self.assertIsInstance(val, ssl._ASN1Object)
860 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
861 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
862 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100863 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
864 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100865
Christian Heimes72d28502013-11-23 13:56:58 +0100866 def test_purpose_enum(self):
867 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
868 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
869 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
872 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
873 '1.3.6.1.5.5.7.3.1')
874
875 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
876 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
877 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
880 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
881 '1.3.6.1.5.5.7.3.2')
882
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100883 def test_unsupported_dtls(self):
884 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
885 self.addCleanup(s.close)
886 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200887 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100888 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100890 with self.assertRaises(NotImplementedError) as cx:
891 ctx.wrap_socket(s)
892 self.assertEqual(str(cx.exception), "only stream sockets are supported")
893
Antoine Pitrouc695c952014-04-28 20:57:36 +0200894 def cert_time_ok(self, timestring, timestamp):
895 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
896
897 def cert_time_fail(self, timestring):
898 with self.assertRaises(ValueError):
899 ssl.cert_time_to_seconds(timestring)
900
901 @unittest.skipUnless(utc_offset(),
902 'local time needs to be different from UTC')
903 def test_cert_time_to_seconds_timezone(self):
904 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
905 # results if local timezone is not UTC
906 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
907 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
908
909 def test_cert_time_to_seconds(self):
910 timestring = "Jan 5 09:34:43 2018 GMT"
911 ts = 1515144883.0
912 self.cert_time_ok(timestring, ts)
913 # accept keyword parameter, assert its name
914 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
915 # accept both %e and %d (space or zero generated by strftime)
916 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
917 # case-insensitive
918 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
919 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
920 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
921 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
922 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
923 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
924 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
925 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
926
927 newyear_ts = 1230768000.0
928 # leap seconds
929 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
930 # same timestamp
931 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
932
933 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
934 # allow 60th second (even if it is not a leap second)
935 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
936 # allow 2nd leap second for compatibility with time.strptime()
937 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
938 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
939
Mike53f7a7c2017-12-14 14:04:53 +0300940 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200941 # 99991231235959Z (rfc 5280)
942 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
943
944 @support.run_with_locale('LC_ALL', '')
945 def test_cert_time_to_seconds_locale(self):
946 # `cert_time_to_seconds()` should be locale independent
947
948 def local_february_name():
949 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
950
951 if local_february_name().lower() == 'feb':
952 self.skipTest("locale-specific month name needs to be "
953 "different from C locale")
954
955 # locale-independent
956 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
957 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
958
Martin Panter3840b2a2016-03-27 01:53:46 +0000959 def test_connect_ex_error(self):
960 server = socket.socket(socket.AF_INET)
961 self.addCleanup(server.close)
962 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200963 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000964 cert_reqs=ssl.CERT_REQUIRED)
965 self.addCleanup(s.close)
966 rc = s.connect_ex((HOST, port))
967 # Issue #19919: Windows machines or VMs hosted on Windows
968 # machines sometimes return EWOULDBLOCK.
969 errors = (
970 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
971 errno.EWOULDBLOCK,
972 )
973 self.assertIn(rc, errors)
974
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100975
Antoine Pitrou152efa22010-05-16 18:19:27 +0000976class ContextTests(unittest.TestCase):
977
Antoine Pitrou23df4832010-08-04 17:14:06 +0000978 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000979 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100980 for protocol in PROTOCOLS:
981 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200982 ctx = ssl.SSLContext()
983 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000984 self.assertRaises(ValueError, ssl.SSLContext, -1)
985 self.assertRaises(ValueError, ssl.SSLContext, 42)
986
Antoine Pitrou23df4832010-08-04 17:14:06 +0000987 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000988 def test_protocol(self):
989 for proto in PROTOCOLS:
990 ctx = ssl.SSLContext(proto)
991 self.assertEqual(ctx.protocol, proto)
992
993 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000995 ctx.set_ciphers("ALL")
996 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000997 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000998 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000999
Christian Heimes892d66e2018-01-29 14:10:18 +01001000 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1001 "Test applies only to Python default ciphers")
1002 def test_python_ciphers(self):
1003 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1004 ciphers = ctx.get_ciphers()
1005 for suite in ciphers:
1006 name = suite['name']
1007 self.assertNotIn("PSK", name)
1008 self.assertNotIn("SRP", name)
1009 self.assertNotIn("MD5", name)
1010 self.assertNotIn("RC4", name)
1011 self.assertNotIn("3DES", name)
1012
Christian Heimes25bfcd52016-09-06 00:04:45 +02001013 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1014 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001016 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001017 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001018 self.assertIn('AES256-GCM-SHA384', names)
1019 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001020
Antoine Pitrou23df4832010-08-04 17:14:06 +00001021 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001022 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001024 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001025 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001026 # SSLContext also enables these by default
1027 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001028 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1029 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001030 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001031 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001032 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001033 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001034 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1035 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001036 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001037 # Ubuntu has OP_NO_SSLv3 forced on by default
1038 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001039 else:
1040 with self.assertRaises(ValueError):
1041 ctx.options = 0
1042
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 def test_verify_mode_protocol(self):
1044 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001045 # Default value
1046 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1047 ctx.verify_mode = ssl.CERT_OPTIONAL
1048 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1049 ctx.verify_mode = ssl.CERT_REQUIRED
1050 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1051 ctx.verify_mode = ssl.CERT_NONE
1052 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1053 with self.assertRaises(TypeError):
1054 ctx.verify_mode = None
1055 with self.assertRaises(ValueError):
1056 ctx.verify_mode = 42
1057
Christian Heimesa170fa12017-09-15 20:27:30 +02001058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1059 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1060 self.assertFalse(ctx.check_hostname)
1061
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1063 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1064 self.assertTrue(ctx.check_hostname)
1065
Christian Heimes61d478c2018-01-27 15:51:38 +01001066 def test_hostname_checks_common_name(self):
1067 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1068 self.assertTrue(ctx.hostname_checks_common_name)
1069 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1070 ctx.hostname_checks_common_name = True
1071 self.assertTrue(ctx.hostname_checks_common_name)
1072 ctx.hostname_checks_common_name = False
1073 self.assertFalse(ctx.hostname_checks_common_name)
1074 ctx.hostname_checks_common_name = True
1075 self.assertTrue(ctx.hostname_checks_common_name)
1076 else:
1077 with self.assertRaises(AttributeError):
1078 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001079
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001080 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1081 "required OpenSSL 1.1.0g")
1082 def test_min_max_version(self):
1083 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1084 self.assertEqual(
1085 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1086 )
1087 self.assertEqual(
1088 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1089 )
1090
1091 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1092 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1093 self.assertEqual(
1094 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1095 )
1096 self.assertEqual(
1097 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1098 )
1099
1100 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1101 ctx.maximum_version = ssl.TLSVersion.TLSv1
1102 self.assertEqual(
1103 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1104 )
1105 self.assertEqual(
1106 ctx.maximum_version, ssl.TLSVersion.TLSv1
1107 )
1108
1109 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1110 self.assertEqual(
1111 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1112 )
1113
1114 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1115 self.assertIn(
1116 ctx.maximum_version,
1117 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1118 )
1119
1120 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1121 self.assertIn(
1122 ctx.minimum_version,
1123 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1124 )
1125
1126 with self.assertRaises(ValueError):
1127 ctx.minimum_version = 42
1128
1129 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1130
1131 self.assertEqual(
1132 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1133 )
1134 self.assertEqual(
1135 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1136 )
1137 with self.assertRaises(ValueError):
1138 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1139 with self.assertRaises(ValueError):
1140 ctx.maximum_version = ssl.TLSVersion.TLSv1
1141
1142
Christian Heimes2427b502013-11-23 11:24:32 +01001143 @unittest.skipUnless(have_verify_flags(),
1144 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001145 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001146 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001147 # default value
1148 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1149 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001150 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1151 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1152 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1153 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1154 ctx.verify_flags = ssl.VERIFY_DEFAULT
1155 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1156 # supports any value
1157 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1158 self.assertEqual(ctx.verify_flags,
1159 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1160 with self.assertRaises(TypeError):
1161 ctx.verify_flags = None
1162
Antoine Pitrou152efa22010-05-16 18:19:27 +00001163 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001165 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001166 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001167 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1168 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001170 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001171 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001172 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001173 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001174 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001175 ctx.load_cert_chain(EMPTYCERT)
1176 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001177 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001178 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1179 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1180 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001181 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001182 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001183 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001184 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001185 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001186 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1187 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001188 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001189 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001190 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001191 # Password protected key and cert
1192 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1193 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1194 ctx.load_cert_chain(CERTFILE_PROTECTED,
1195 password=bytearray(KEY_PASSWORD.encode()))
1196 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1197 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1198 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1199 bytearray(KEY_PASSWORD.encode()))
1200 with self.assertRaisesRegex(TypeError, "should be a string"):
1201 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1202 with self.assertRaises(ssl.SSLError):
1203 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1204 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1205 # openssl has a fixed limit on the password buffer.
1206 # PEM_BUFSIZE is generally set to 1kb.
1207 # Return a string larger than this.
1208 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1209 # Password callback
1210 def getpass_unicode():
1211 return KEY_PASSWORD
1212 def getpass_bytes():
1213 return KEY_PASSWORD.encode()
1214 def getpass_bytearray():
1215 return bytearray(KEY_PASSWORD.encode())
1216 def getpass_badpass():
1217 return "badpass"
1218 def getpass_huge():
1219 return b'a' * (1024 * 1024)
1220 def getpass_bad_type():
1221 return 9
1222 def getpass_exception():
1223 raise Exception('getpass error')
1224 class GetPassCallable:
1225 def __call__(self):
1226 return KEY_PASSWORD
1227 def getpass(self):
1228 return KEY_PASSWORD
1229 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1232 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1233 ctx.load_cert_chain(CERTFILE_PROTECTED,
1234 password=GetPassCallable().getpass)
1235 with self.assertRaises(ssl.SSLError):
1236 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1237 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1238 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1239 with self.assertRaisesRegex(TypeError, "must return a string"):
1240 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1241 with self.assertRaisesRegex(Exception, "getpass error"):
1242 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1243 # Make sure the password function isn't called if it isn't needed
1244 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001245
1246 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001247 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001248 ctx.load_verify_locations(CERTFILE)
1249 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1250 ctx.load_verify_locations(BYTES_CERTFILE)
1251 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1252 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001253 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001254 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001255 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001256 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001257 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001258 ctx.load_verify_locations(BADCERT)
1259 ctx.load_verify_locations(CERTFILE, CAPATH)
1260 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1261
Victor Stinner80f75e62011-01-29 11:31:20 +00001262 # Issue #10989: crash if the second argument type is invalid
1263 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1264
Christian Heimesefff7062013-11-21 03:35:02 +01001265 def test_load_verify_cadata(self):
1266 # test cadata
1267 with open(CAFILE_CACERT) as f:
1268 cacert_pem = f.read()
1269 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1270 with open(CAFILE_NEURONIO) as f:
1271 neuronio_pem = f.read()
1272 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1273
1274 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001275 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001276 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1277 ctx.load_verify_locations(cadata=cacert_pem)
1278 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1279 ctx.load_verify_locations(cadata=neuronio_pem)
1280 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1281 # cert already in hash table
1282 ctx.load_verify_locations(cadata=neuronio_pem)
1283 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1284
1285 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001286 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001287 combined = "\n".join((cacert_pem, neuronio_pem))
1288 ctx.load_verify_locations(cadata=combined)
1289 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1290
1291 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001292 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001293 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1294 neuronio_pem, "tail"]
1295 ctx.load_verify_locations(cadata="\n".join(combined))
1296 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1297
1298 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001299 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001300 ctx.load_verify_locations(cadata=cacert_der)
1301 ctx.load_verify_locations(cadata=neuronio_der)
1302 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1303 # cert already in hash table
1304 ctx.load_verify_locations(cadata=cacert_der)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1306
1307 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001308 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001309 combined = b"".join((cacert_der, neuronio_der))
1310 ctx.load_verify_locations(cadata=combined)
1311 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1312
1313 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001314 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001315 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1316
1317 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1318 ctx.load_verify_locations(cadata="broken")
1319 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1320 ctx.load_verify_locations(cadata=b"broken")
1321
1322
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001323 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001324 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001325 ctx.load_dh_params(DHFILE)
1326 if os.name != 'nt':
1327 ctx.load_dh_params(BYTES_DHFILE)
1328 self.assertRaises(TypeError, ctx.load_dh_params)
1329 self.assertRaises(TypeError, ctx.load_dh_params, None)
1330 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001331 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001332 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001333 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001334 ctx.load_dh_params(CERTFILE)
1335
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001336 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001337 def test_session_stats(self):
1338 for proto in PROTOCOLS:
1339 ctx = ssl.SSLContext(proto)
1340 self.assertEqual(ctx.session_stats(), {
1341 'number': 0,
1342 'connect': 0,
1343 'connect_good': 0,
1344 'connect_renegotiate': 0,
1345 'accept': 0,
1346 'accept_good': 0,
1347 'accept_renegotiate': 0,
1348 'hits': 0,
1349 'misses': 0,
1350 'timeouts': 0,
1351 'cache_full': 0,
1352 })
1353
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001354 def test_set_default_verify_paths(self):
1355 # There's not much we can do to test that it acts as expected,
1356 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001357 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001358 ctx.set_default_verify_paths()
1359
Antoine Pitrou501da612011-12-21 09:27:41 +01001360 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001361 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001362 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001363 ctx.set_ecdh_curve("prime256v1")
1364 ctx.set_ecdh_curve(b"prime256v1")
1365 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1366 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1367 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1368 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1369
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001370 @needs_sni
1371 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001372 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001373
1374 # set_servername_callback expects a callable, or None
1375 self.assertRaises(TypeError, ctx.set_servername_callback)
1376 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1377 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1378 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1379
1380 def dummycallback(sock, servername, ctx):
1381 pass
1382 ctx.set_servername_callback(None)
1383 ctx.set_servername_callback(dummycallback)
1384
1385 @needs_sni
1386 def test_sni_callback_refcycle(self):
1387 # Reference cycles through the servername callback are detected
1388 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001389 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001390 def dummycallback(sock, servername, ctx, cycle=ctx):
1391 pass
1392 ctx.set_servername_callback(dummycallback)
1393 wr = weakref.ref(ctx)
1394 del ctx, dummycallback
1395 gc.collect()
1396 self.assertIs(wr(), None)
1397
Christian Heimes9a5395a2013-06-17 15:44:12 +02001398 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001399 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001400 self.assertEqual(ctx.cert_store_stats(),
1401 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1402 ctx.load_cert_chain(CERTFILE)
1403 self.assertEqual(ctx.cert_store_stats(),
1404 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1405 ctx.load_verify_locations(CERTFILE)
1406 self.assertEqual(ctx.cert_store_stats(),
1407 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001408 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001409 self.assertEqual(ctx.cert_store_stats(),
1410 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1411
1412 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001413 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001414 self.assertEqual(ctx.get_ca_certs(), [])
1415 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1416 ctx.load_verify_locations(CERTFILE)
1417 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001418 # but CAFILE_CACERT is a CA cert
1419 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001420 self.assertEqual(ctx.get_ca_certs(),
1421 [{'issuer': ((('organizationName', 'Root CA'),),
1422 (('organizationalUnitName', 'http://www.cacert.org'),),
1423 (('commonName', 'CA Cert Signing Authority'),),
1424 (('emailAddress', 'support@cacert.org'),)),
1425 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1426 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1427 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001428 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001429 'subject': ((('organizationName', 'Root CA'),),
1430 (('organizationalUnitName', 'http://www.cacert.org'),),
1431 (('commonName', 'CA Cert Signing Authority'),),
1432 (('emailAddress', 'support@cacert.org'),)),
1433 'version': 3}])
1434
Martin Panterb55f8b72016-01-14 12:53:56 +00001435 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001436 pem = f.read()
1437 der = ssl.PEM_cert_to_DER_cert(pem)
1438 self.assertEqual(ctx.get_ca_certs(True), [der])
1439
Christian Heimes72d28502013-11-23 13:56:58 +01001440 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001441 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001442 ctx.load_default_certs()
1443
Christian Heimesa170fa12017-09-15 20:27:30 +02001444 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001445 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1446 ctx.load_default_certs()
1447
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001449 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1450
Christian Heimesa170fa12017-09-15 20:27:30 +02001451 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001452 self.assertRaises(TypeError, ctx.load_default_certs, None)
1453 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1454
Benjamin Peterson91244e02014-10-03 18:17:15 -04001455 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001456 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001457 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001458 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001459 with support.EnvironmentVarGuard() as env:
1460 env["SSL_CERT_DIR"] = CAPATH
1461 env["SSL_CERT_FILE"] = CERTFILE
1462 ctx.load_default_certs()
1463 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1464
Benjamin Peterson91244e02014-10-03 18:17:15 -04001465 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001466 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001467 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001469 ctx.load_default_certs()
1470 stats = ctx.cert_store_stats()
1471
Christian Heimesa170fa12017-09-15 20:27:30 +02001472 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001473 with support.EnvironmentVarGuard() as env:
1474 env["SSL_CERT_DIR"] = CAPATH
1475 env["SSL_CERT_FILE"] = CERTFILE
1476 ctx.load_default_certs()
1477 stats["x509"] += 1
1478 self.assertEqual(ctx.cert_store_stats(), stats)
1479
Christian Heimes358cfd42016-09-10 22:43:48 +02001480 def _assert_context_options(self, ctx):
1481 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1482 if OP_NO_COMPRESSION != 0:
1483 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1484 OP_NO_COMPRESSION)
1485 if OP_SINGLE_DH_USE != 0:
1486 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1487 OP_SINGLE_DH_USE)
1488 if OP_SINGLE_ECDH_USE != 0:
1489 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1490 OP_SINGLE_ECDH_USE)
1491 if OP_CIPHER_SERVER_PREFERENCE != 0:
1492 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1493 OP_CIPHER_SERVER_PREFERENCE)
1494
Christian Heimes4c05b472013-11-23 15:58:30 +01001495 def test_create_default_context(self):
1496 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001497
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001499 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001500 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001501 self._assert_context_options(ctx)
1502
Christian Heimes4c05b472013-11-23 15:58:30 +01001503 with open(SIGNING_CA) as f:
1504 cadata = f.read()
1505 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1506 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001507 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001508 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001509 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001510
1511 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001512 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001513 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001515
Christian Heimes67986f92013-11-23 22:43:47 +01001516 def test__create_stdlib_context(self):
1517 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001518 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001519 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001520 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001521 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001522
1523 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1524 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1525 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001526 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001527
1528 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001529 cert_reqs=ssl.CERT_REQUIRED,
1530 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001531 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1532 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001533 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001534 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001535
1536 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001537 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001538 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001539 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001540
Christian Heimes1aa9a752013-12-02 02:41:19 +01001541 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001543 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001544 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001545
Christian Heimese82c0342017-09-15 20:29:57 +02001546 # Auto set CERT_REQUIRED
1547 ctx.check_hostname = True
1548 self.assertTrue(ctx.check_hostname)
1549 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1550 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001551 ctx.verify_mode = ssl.CERT_REQUIRED
1552 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001553 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001554
Christian Heimese82c0342017-09-15 20:29:57 +02001555 # Changing verify_mode does not affect check_hostname
1556 ctx.check_hostname = False
1557 ctx.verify_mode = ssl.CERT_NONE
1558 ctx.check_hostname = False
1559 self.assertFalse(ctx.check_hostname)
1560 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1561 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001562 ctx.check_hostname = True
1563 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001564 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1565
1566 ctx.check_hostname = False
1567 ctx.verify_mode = ssl.CERT_OPTIONAL
1568 ctx.check_hostname = False
1569 self.assertFalse(ctx.check_hostname)
1570 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1571 # keep CERT_OPTIONAL
1572 ctx.check_hostname = True
1573 self.assertTrue(ctx.check_hostname)
1574 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575
1576 # Cannot set CERT_NONE with check_hostname enabled
1577 with self.assertRaises(ValueError):
1578 ctx.verify_mode = ssl.CERT_NONE
1579 ctx.check_hostname = False
1580 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001581 ctx.verify_mode = ssl.CERT_NONE
1582 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001583
Christian Heimes5fe668c2016-09-12 00:01:11 +02001584 def test_context_client_server(self):
1585 # PROTOCOL_TLS_CLIENT has sane defaults
1586 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1587 self.assertTrue(ctx.check_hostname)
1588 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1589
1590 # PROTOCOL_TLS_SERVER has different but also sane defaults
1591 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1592 self.assertFalse(ctx.check_hostname)
1593 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1594
Christian Heimes4df60f12017-09-15 20:26:05 +02001595 def test_context_custom_class(self):
1596 class MySSLSocket(ssl.SSLSocket):
1597 pass
1598
1599 class MySSLObject(ssl.SSLObject):
1600 pass
1601
1602 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1603 ctx.sslsocket_class = MySSLSocket
1604 ctx.sslobject_class = MySSLObject
1605
1606 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1607 self.assertIsInstance(sock, MySSLSocket)
1608 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1609 self.assertIsInstance(obj, MySSLObject)
1610
Antoine Pitrou152efa22010-05-16 18:19:27 +00001611
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001612class SSLErrorTests(unittest.TestCase):
1613
1614 def test_str(self):
1615 # The str() of a SSLError doesn't include the errno
1616 e = ssl.SSLError(1, "foo")
1617 self.assertEqual(str(e), "foo")
1618 self.assertEqual(e.errno, 1)
1619 # Same for a subclass
1620 e = ssl.SSLZeroReturnError(1, "foo")
1621 self.assertEqual(str(e), "foo")
1622 self.assertEqual(e.errno, 1)
1623
1624 def test_lib_reason(self):
1625 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001626 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001627 with self.assertRaises(ssl.SSLError) as cm:
1628 ctx.load_dh_params(CERTFILE)
1629 self.assertEqual(cm.exception.library, 'PEM')
1630 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1631 s = str(cm.exception)
1632 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1633
1634 def test_subclass(self):
1635 # Check that the appropriate SSLError subclass is raised
1636 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001637 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1638 ctx.check_hostname = False
1639 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001640 with socket.socket() as s:
1641 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001642 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001643 c = socket.socket()
1644 c.connect(s.getsockname())
1645 c.setblocking(False)
1646 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001647 with self.assertRaises(ssl.SSLWantReadError) as cm:
1648 c.do_handshake()
1649 s = str(cm.exception)
1650 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1651 # For compatibility
1652 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1653
1654
Christian Heimes61d478c2018-01-27 15:51:38 +01001655 def test_bad_server_hostname(self):
1656 ctx = ssl.create_default_context()
1657 with self.assertRaises(ValueError):
1658 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1659 server_hostname="")
1660 with self.assertRaises(ValueError):
1661 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1662 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001663 with self.assertRaises(TypeError):
1664 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1665 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001666
1667
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001668class MemoryBIOTests(unittest.TestCase):
1669
1670 def test_read_write(self):
1671 bio = ssl.MemoryBIO()
1672 bio.write(b'foo')
1673 self.assertEqual(bio.read(), b'foo')
1674 self.assertEqual(bio.read(), b'')
1675 bio.write(b'foo')
1676 bio.write(b'bar')
1677 self.assertEqual(bio.read(), b'foobar')
1678 self.assertEqual(bio.read(), b'')
1679 bio.write(b'baz')
1680 self.assertEqual(bio.read(2), b'ba')
1681 self.assertEqual(bio.read(1), b'z')
1682 self.assertEqual(bio.read(1), b'')
1683
1684 def test_eof(self):
1685 bio = ssl.MemoryBIO()
1686 self.assertFalse(bio.eof)
1687 self.assertEqual(bio.read(), b'')
1688 self.assertFalse(bio.eof)
1689 bio.write(b'foo')
1690 self.assertFalse(bio.eof)
1691 bio.write_eof()
1692 self.assertFalse(bio.eof)
1693 self.assertEqual(bio.read(2), b'fo')
1694 self.assertFalse(bio.eof)
1695 self.assertEqual(bio.read(1), b'o')
1696 self.assertTrue(bio.eof)
1697 self.assertEqual(bio.read(), b'')
1698 self.assertTrue(bio.eof)
1699
1700 def test_pending(self):
1701 bio = ssl.MemoryBIO()
1702 self.assertEqual(bio.pending, 0)
1703 bio.write(b'foo')
1704 self.assertEqual(bio.pending, 3)
1705 for i in range(3):
1706 bio.read(1)
1707 self.assertEqual(bio.pending, 3-i-1)
1708 for i in range(3):
1709 bio.write(b'x')
1710 self.assertEqual(bio.pending, i+1)
1711 bio.read()
1712 self.assertEqual(bio.pending, 0)
1713
1714 def test_buffer_types(self):
1715 bio = ssl.MemoryBIO()
1716 bio.write(b'foo')
1717 self.assertEqual(bio.read(), b'foo')
1718 bio.write(bytearray(b'bar'))
1719 self.assertEqual(bio.read(), b'bar')
1720 bio.write(memoryview(b'baz'))
1721 self.assertEqual(bio.read(), b'baz')
1722
1723 def test_error_types(self):
1724 bio = ssl.MemoryBIO()
1725 self.assertRaises(TypeError, bio.write, 'foo')
1726 self.assertRaises(TypeError, bio.write, None)
1727 self.assertRaises(TypeError, bio.write, True)
1728 self.assertRaises(TypeError, bio.write, 1)
1729
1730
Christian Heimes89c20512018-02-27 11:17:32 +01001731class SSLObjectTests(unittest.TestCase):
1732 def test_private_init(self):
1733 bio = ssl.MemoryBIO()
1734 with self.assertRaisesRegex(TypeError, "public constructor"):
1735 ssl.SSLObject(bio, bio)
1736
1737
Martin Panter3840b2a2016-03-27 01:53:46 +00001738class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001739 """Tests that connect to a simple server running in the background"""
1740
1741 def setUp(self):
1742 server = ThreadedEchoServer(SIGNED_CERTFILE)
1743 self.server_addr = (HOST, server.port)
1744 server.__enter__()
1745 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001746
Antoine Pitrou480a1242010-04-28 21:37:09 +00001747 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001748 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001749 cert_reqs=ssl.CERT_NONE) as s:
1750 s.connect(self.server_addr)
1751 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001752 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001753
Martin Panter3840b2a2016-03-27 01:53:46 +00001754 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001755 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001756 cert_reqs=ssl.CERT_REQUIRED,
1757 ca_certs=SIGNING_CA) as s:
1758 s.connect(self.server_addr)
1759 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001760 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001761
Martin Panter3840b2a2016-03-27 01:53:46 +00001762 def test_connect_fail(self):
1763 # This should fail because we have no verification certs. Connection
1764 # failure crashes ThreadedEchoServer, so run this in an independent
1765 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001766 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001767 cert_reqs=ssl.CERT_REQUIRED)
1768 self.addCleanup(s.close)
1769 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1770 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001771
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001772 def test_connect_ex(self):
1773 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001774 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001775 cert_reqs=ssl.CERT_REQUIRED,
1776 ca_certs=SIGNING_CA)
1777 self.addCleanup(s.close)
1778 self.assertEqual(0, s.connect_ex(self.server_addr))
1779 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001780
1781 def test_non_blocking_connect_ex(self):
1782 # Issue #11326: non-blocking connect_ex() should allow handshake
1783 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001784 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001785 cert_reqs=ssl.CERT_REQUIRED,
1786 ca_certs=SIGNING_CA,
1787 do_handshake_on_connect=False)
1788 self.addCleanup(s.close)
1789 s.setblocking(False)
1790 rc = s.connect_ex(self.server_addr)
1791 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1792 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1793 # Wait for connect to finish
1794 select.select([], [s], [], 5.0)
1795 # Non-blocking handshake
1796 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001797 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001798 s.do_handshake()
1799 break
1800 except ssl.SSLWantReadError:
1801 select.select([s], [], [], 5.0)
1802 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001803 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001804 # SSL established
1805 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001806
Antoine Pitrou152efa22010-05-16 18:19:27 +00001807 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001808 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001809 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001810 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1811 s.connect(self.server_addr)
1812 self.assertEqual({}, s.getpeercert())
1813 # Same with a server hostname
1814 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1815 server_hostname="dummy") as s:
1816 s.connect(self.server_addr)
1817 ctx.verify_mode = ssl.CERT_REQUIRED
1818 # This should succeed because we specify the root cert
1819 ctx.load_verify_locations(SIGNING_CA)
1820 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1821 s.connect(self.server_addr)
1822 cert = s.getpeercert()
1823 self.assertTrue(cert)
1824
1825 def test_connect_with_context_fail(self):
1826 # This should fail because we have no verification certs. Connection
1827 # failure crashes ThreadedEchoServer, so run this in an independent
1828 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001829 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001830 ctx.verify_mode = ssl.CERT_REQUIRED
1831 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1832 self.addCleanup(s.close)
1833 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1834 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001835
1836 def test_connect_capath(self):
1837 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001838 # NOTE: the subject hashing algorithm has been changed between
1839 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1840 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001841 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001842 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 ctx.verify_mode = ssl.CERT_REQUIRED
1844 ctx.load_verify_locations(capath=CAPATH)
1845 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1846 s.connect(self.server_addr)
1847 cert = s.getpeercert()
1848 self.assertTrue(cert)
1849 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001850 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001851 ctx.verify_mode = ssl.CERT_REQUIRED
1852 ctx.load_verify_locations(capath=BYTES_CAPATH)
1853 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1854 s.connect(self.server_addr)
1855 cert = s.getpeercert()
1856 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001857
Christian Heimesefff7062013-11-21 03:35:02 +01001858 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001859 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001860 pem = f.read()
1861 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001862 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001863 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001864 # TODO: fix TLSv1.3 support
1865 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001866 ctx.load_verify_locations(cadata=pem)
1867 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1868 s.connect(self.server_addr)
1869 cert = s.getpeercert()
1870 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001871
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001873 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001874 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001875 # TODO: fix TLSv1.3 support
1876 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001877 ctx.load_verify_locations(cadata=der)
1878 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1879 s.connect(self.server_addr)
1880 cert = s.getpeercert()
1881 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001882
Antoine Pitroue3220242010-04-24 11:13:53 +00001883 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1884 def test_makefile_close(self):
1885 # Issue #5238: creating a file-like object with makefile() shouldn't
1886 # delay closing the underlying "real socket" (here tested with its
1887 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001888 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001889 ss.connect(self.server_addr)
1890 fd = ss.fileno()
1891 f = ss.makefile()
1892 f.close()
1893 # The fd is still open
1894 os.read(fd, 0)
1895 # Closing the SSL socket should close the fd too
1896 ss.close()
1897 gc.collect()
1898 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001899 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001900 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001901
Antoine Pitrou480a1242010-04-28 21:37:09 +00001902 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001903 s = socket.socket(socket.AF_INET)
1904 s.connect(self.server_addr)
1905 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001906 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001907 cert_reqs=ssl.CERT_NONE,
1908 do_handshake_on_connect=False)
1909 self.addCleanup(s.close)
1910 count = 0
1911 while True:
1912 try:
1913 count += 1
1914 s.do_handshake()
1915 break
1916 except ssl.SSLWantReadError:
1917 select.select([s], [], [])
1918 except ssl.SSLWantWriteError:
1919 select.select([], [s], [])
1920 if support.verbose:
1921 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001922
Antoine Pitrou480a1242010-04-28 21:37:09 +00001923 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001925
Martin Panter3840b2a2016-03-27 01:53:46 +00001926 def test_get_server_certificate_fail(self):
1927 # Connection failure crashes ThreadedEchoServer, so run this in an
1928 # independent test method
1929 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001930
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001931 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001932 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001933 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1934 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001935 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001936 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1937 s.connect(self.server_addr)
1938 # Error checking can happen at instantiation or when connecting
1939 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1940 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001941 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1943 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001944
Christian Heimes9a5395a2013-06-17 15:44:12 +02001945 def test_get_ca_certs_capath(self):
1946 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001947 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001948 ctx.load_verify_locations(capath=CAPATH)
1949 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001950 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1951 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001952 s.connect(self.server_addr)
1953 cert = s.getpeercert()
1954 self.assertTrue(cert)
1955 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001956
Christian Heimes575596e2013-12-15 21:49:17 +01001957 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001958 def test_context_setget(self):
1959 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001960 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1961 ctx1.load_verify_locations(capath=CAPATH)
1962 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1963 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001964 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001965 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001966 ss.connect(self.server_addr)
1967 self.assertIs(ss.context, ctx1)
1968 self.assertIs(ss._sslobj.context, ctx1)
1969 ss.context = ctx2
1970 self.assertIs(ss.context, ctx2)
1971 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001972
1973 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1974 # A simple IO loop. Call func(*args) depending on the error we get
1975 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1976 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001977 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001978 count = 0
1979 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001980 if time.monotonic() > deadline:
1981 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001982 errno = None
1983 count += 1
1984 try:
1985 ret = func(*args)
1986 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001987 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001988 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001989 raise
1990 errno = e.errno
1991 # Get any data from the outgoing BIO irrespective of any error, and
1992 # send it to the socket.
1993 buf = outgoing.read()
1994 sock.sendall(buf)
1995 # If there's no error, we're done. For WANT_READ, we need to get
1996 # data from the socket and put it in the incoming BIO.
1997 if errno is None:
1998 break
1999 elif errno == ssl.SSL_ERROR_WANT_READ:
2000 buf = sock.recv(32768)
2001 if buf:
2002 incoming.write(buf)
2003 else:
2004 incoming.write_eof()
2005 if support.verbose:
2006 sys.stdout.write("Needed %d calls to complete %s().\n"
2007 % (count, func.__name__))
2008 return ret
2009
Martin Panter3840b2a2016-03-27 01:53:46 +00002010 def test_bio_handshake(self):
2011 sock = socket.socket(socket.AF_INET)
2012 self.addCleanup(sock.close)
2013 sock.connect(self.server_addr)
2014 incoming = ssl.MemoryBIO()
2015 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002016 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2017 self.assertTrue(ctx.check_hostname)
2018 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002019 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002020 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2021 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002022 self.assertIs(sslobj._sslobj.owner, sslobj)
2023 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002024 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002025 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002026 self.assertRaises(ValueError, sslobj.getpeercert)
2027 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2028 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2029 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2030 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002031 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002032 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002033 self.assertTrue(sslobj.getpeercert())
2034 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2035 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2036 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002037 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002038 except ssl.SSLSyscallError:
2039 # If the server shuts down the TCP connection without sending a
2040 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2041 pass
2042 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2043
2044 def test_bio_read_write_data(self):
2045 sock = socket.socket(socket.AF_INET)
2046 self.addCleanup(sock.close)
2047 sock.connect(self.server_addr)
2048 incoming = ssl.MemoryBIO()
2049 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002051 ctx.verify_mode = ssl.CERT_NONE
2052 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2053 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2054 req = b'FOO\n'
2055 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2056 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2057 self.assertEqual(buf, b'foo\n')
2058 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002059
2060
Martin Panter3840b2a2016-03-27 01:53:46 +00002061class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002062
Martin Panter3840b2a2016-03-27 01:53:46 +00002063 def test_timeout_connect_ex(self):
2064 # Issue #12065: on a timeout, connect_ex() should return the original
2065 # errno (mimicking the behaviour of non-SSL sockets).
2066 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002067 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002068 cert_reqs=ssl.CERT_REQUIRED,
2069 do_handshake_on_connect=False)
2070 self.addCleanup(s.close)
2071 s.settimeout(0.0000001)
2072 rc = s.connect_ex((REMOTE_HOST, 443))
2073 if rc == 0:
2074 self.skipTest("REMOTE_HOST responded too quickly")
2075 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2076
2077 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2078 def test_get_server_certificate_ipv6(self):
2079 with support.transient_internet('ipv6.google.com'):
2080 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2081 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2082
Martin Panter3840b2a2016-03-27 01:53:46 +00002083
2084def _test_get_server_certificate(test, host, port, cert=None):
2085 pem = ssl.get_server_certificate((host, port))
2086 if not pem:
2087 test.fail("No server certificate on %s:%s!" % (host, port))
2088
2089 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2090 if not pem:
2091 test.fail("No server certificate on %s:%s!" % (host, port))
2092 if support.verbose:
2093 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2094
2095def _test_get_server_certificate_fail(test, host, port):
2096 try:
2097 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2098 except ssl.SSLError as x:
2099 #should fail
2100 if support.verbose:
2101 sys.stdout.write("%s\n" % x)
2102 else:
2103 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2104
2105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002106from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002107
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002108class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002109
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002110 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002111
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002112 """A mildly complicated class, because we want it to work both
2113 with and without the SSL wrapper around the socket connection, so
2114 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002115
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002116 def __init__(self, server, connsock, addr):
2117 self.server = server
2118 self.running = False
2119 self.sock = connsock
2120 self.addr = addr
2121 self.sock.setblocking(1)
2122 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002123 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002124 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002125
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002126 def wrap_conn(self):
2127 try:
2128 self.sslconn = self.server.context.wrap_socket(
2129 self.sock, server_side=True)
2130 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2131 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2132 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2133 # We treat ConnectionResetError as though it were an
2134 # SSLError - OpenSSL on Ubuntu abruptly closes the
2135 # connection when asked to use an unsupported protocol.
2136 #
2137 # OSError may occur with wrong protocols, e.g. both
2138 # sides use PROTOCOL_TLS_SERVER.
2139 #
2140 # XXX Various errors can have happened here, for example
2141 # a mismatching protocol version, an invalid certificate,
2142 # or a low-level bug. This should be made more discriminating.
2143 #
2144 # bpo-31323: Store the exception as string to prevent
2145 # a reference leak: server -> conn_errors -> exception
2146 # -> traceback -> self (ConnectionHandler) -> server
2147 self.server.conn_errors.append(str(e))
2148 if self.server.chatty:
2149 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2150 self.running = False
2151 self.server.stop()
2152 self.close()
2153 return False
2154 else:
2155 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2156 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2157 cert = self.sslconn.getpeercert()
2158 if support.verbose and self.server.chatty:
2159 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2160 cert_binary = self.sslconn.getpeercert(True)
2161 if support.verbose and self.server.chatty:
2162 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2163 cipher = self.sslconn.cipher()
2164 if support.verbose and self.server.chatty:
2165 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2166 sys.stdout.write(" server: selected protocol is now "
2167 + str(self.sslconn.selected_npn_protocol()) + "\n")
2168 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002169
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002170 def read(self):
2171 if self.sslconn:
2172 return self.sslconn.read()
2173 else:
2174 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002175
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002176 def write(self, bytes):
2177 if self.sslconn:
2178 return self.sslconn.write(bytes)
2179 else:
2180 return self.sock.send(bytes)
2181
2182 def close(self):
2183 if self.sslconn:
2184 self.sslconn.close()
2185 else:
2186 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002187
Antoine Pitrou480a1242010-04-28 21:37:09 +00002188 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002189 self.running = True
2190 if not self.server.starttls_server:
2191 if not self.wrap_conn():
2192 return
2193 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002194 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002195 msg = self.read()
2196 stripped = msg.strip()
2197 if not stripped:
2198 # eof, so quit this handler
2199 self.running = False
2200 try:
2201 self.sock = self.sslconn.unwrap()
2202 except OSError:
2203 # Many tests shut the TCP connection down
2204 # without an SSL shutdown. This causes
2205 # unwrap() to raise OSError with errno=0!
2206 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002207 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002208 self.sslconn = None
2209 self.close()
2210 elif stripped == b'over':
2211 if support.verbose and self.server.connectionchatty:
2212 sys.stdout.write(" server: client closed connection\n")
2213 self.close()
2214 return
2215 elif (self.server.starttls_server and
2216 stripped == b'STARTTLS'):
2217 if support.verbose and self.server.connectionchatty:
2218 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2219 self.write(b"OK\n")
2220 if not self.wrap_conn():
2221 return
2222 elif (self.server.starttls_server and self.sslconn
2223 and stripped == b'ENDTLS'):
2224 if support.verbose and self.server.connectionchatty:
2225 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2226 self.write(b"OK\n")
2227 self.sock = self.sslconn.unwrap()
2228 self.sslconn = None
2229 if support.verbose and self.server.connectionchatty:
2230 sys.stdout.write(" server: connection is now unencrypted...\n")
2231 elif stripped == b'CB tls-unique':
2232 if support.verbose and self.server.connectionchatty:
2233 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2234 data = self.sslconn.get_channel_binding("tls-unique")
2235 self.write(repr(data).encode("us-ascii") + b"\n")
2236 else:
2237 if (support.verbose and
2238 self.server.connectionchatty):
2239 ctype = (self.sslconn and "encrypted") or "unencrypted"
2240 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2241 % (msg, ctype, msg.lower(), ctype))
2242 self.write(msg.lower())
2243 except OSError:
2244 if self.server.chatty:
2245 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002246 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002247 self.running = False
2248 # normally, we'd just stop here, but for the test
2249 # harness, we want to stop the server
2250 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002251
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002252 def __init__(self, certificate=None, ssl_version=None,
2253 certreqs=None, cacerts=None,
2254 chatty=True, connectionchatty=False, starttls_server=False,
2255 npn_protocols=None, alpn_protocols=None,
2256 ciphers=None, context=None):
2257 if context:
2258 self.context = context
2259 else:
2260 self.context = ssl.SSLContext(ssl_version
2261 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002262 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002263 self.context.verify_mode = (certreqs if certreqs is not None
2264 else ssl.CERT_NONE)
2265 if cacerts:
2266 self.context.load_verify_locations(cacerts)
2267 if certificate:
2268 self.context.load_cert_chain(certificate)
2269 if npn_protocols:
2270 self.context.set_npn_protocols(npn_protocols)
2271 if alpn_protocols:
2272 self.context.set_alpn_protocols(alpn_protocols)
2273 if ciphers:
2274 self.context.set_ciphers(ciphers)
2275 self.chatty = chatty
2276 self.connectionchatty = connectionchatty
2277 self.starttls_server = starttls_server
2278 self.sock = socket.socket()
2279 self.port = support.bind_port(self.sock)
2280 self.flag = None
2281 self.active = False
2282 self.selected_npn_protocols = []
2283 self.selected_alpn_protocols = []
2284 self.shared_ciphers = []
2285 self.conn_errors = []
2286 threading.Thread.__init__(self)
2287 self.daemon = True
2288
2289 def __enter__(self):
2290 self.start(threading.Event())
2291 self.flag.wait()
2292 return self
2293
2294 def __exit__(self, *args):
2295 self.stop()
2296 self.join()
2297
2298 def start(self, flag=None):
2299 self.flag = flag
2300 threading.Thread.start(self)
2301
2302 def run(self):
2303 self.sock.settimeout(0.05)
2304 self.sock.listen()
2305 self.active = True
2306 if self.flag:
2307 # signal an event
2308 self.flag.set()
2309 while self.active:
2310 try:
2311 newconn, connaddr = self.sock.accept()
2312 if support.verbose and self.chatty:
2313 sys.stdout.write(' server: new connection from '
2314 + repr(connaddr) + '\n')
2315 handler = self.ConnectionHandler(self, newconn, connaddr)
2316 handler.start()
2317 handler.join()
2318 except socket.timeout:
2319 pass
2320 except KeyboardInterrupt:
2321 self.stop()
2322 self.sock.close()
2323
2324 def stop(self):
2325 self.active = False
2326
2327class AsyncoreEchoServer(threading.Thread):
2328
2329 # this one's based on asyncore.dispatcher
2330
2331 class EchoServer (asyncore.dispatcher):
2332
2333 class ConnectionHandler(asyncore.dispatcher_with_send):
2334
2335 def __init__(self, conn, certfile):
2336 self.socket = test_wrap_socket(conn, server_side=True,
2337 certfile=certfile,
2338 do_handshake_on_connect=False)
2339 asyncore.dispatcher_with_send.__init__(self, self.socket)
2340 self._ssl_accepting = True
2341 self._do_ssl_handshake()
2342
2343 def readable(self):
2344 if isinstance(self.socket, ssl.SSLSocket):
2345 while self.socket.pending() > 0:
2346 self.handle_read_event()
2347 return True
2348
2349 def _do_ssl_handshake(self):
2350 try:
2351 self.socket.do_handshake()
2352 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2353 return
2354 except ssl.SSLEOFError:
2355 return self.handle_close()
2356 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002357 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002358 except OSError as err:
2359 if err.args[0] == errno.ECONNABORTED:
2360 return self.handle_close()
2361 else:
2362 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002363
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002364 def handle_read(self):
2365 if self._ssl_accepting:
2366 self._do_ssl_handshake()
2367 else:
2368 data = self.recv(1024)
2369 if support.verbose:
2370 sys.stdout.write(" server: read %s from client\n" % repr(data))
2371 if not data:
2372 self.close()
2373 else:
2374 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002375
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002376 def handle_close(self):
2377 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002378 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002379 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002380
2381 def handle_error(self):
2382 raise
2383
Trent Nelson78520002008-04-10 20:54:35 +00002384 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002385 self.certfile = certfile
2386 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2387 self.port = support.bind_port(sock, '')
2388 asyncore.dispatcher.__init__(self, sock)
2389 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002390
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002391 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002392 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002393 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2394 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002395
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002396 def handle_error(self):
2397 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002398
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002399 def __init__(self, certfile):
2400 self.flag = None
2401 self.active = False
2402 self.server = self.EchoServer(certfile)
2403 self.port = self.server.port
2404 threading.Thread.__init__(self)
2405 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002406
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002407 def __str__(self):
2408 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002409
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002410 def __enter__(self):
2411 self.start(threading.Event())
2412 self.flag.wait()
2413 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002414
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002415 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002416 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002417 sys.stdout.write(" cleanup: stopping server.\n")
2418 self.stop()
2419 if support.verbose:
2420 sys.stdout.write(" cleanup: joining server thread.\n")
2421 self.join()
2422 if support.verbose:
2423 sys.stdout.write(" cleanup: successfully joined.\n")
2424 # make sure that ConnectionHandler is removed from socket_map
2425 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002426
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002427 def start (self, flag=None):
2428 self.flag = flag
2429 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002430
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002431 def run(self):
2432 self.active = True
2433 if self.flag:
2434 self.flag.set()
2435 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002436 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002437 asyncore.loop(1)
2438 except:
2439 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002440
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 def stop(self):
2442 self.active = False
2443 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002444
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002445def server_params_test(client_context, server_context, indata=b"FOO\n",
2446 chatty=True, connectionchatty=False, sni_name=None,
2447 session=None):
2448 """
2449 Launch a server, connect a client to it and try various reads
2450 and writes.
2451 """
2452 stats = {}
2453 server = ThreadedEchoServer(context=server_context,
2454 chatty=chatty,
2455 connectionchatty=False)
2456 with server:
2457 with client_context.wrap_socket(socket.socket(),
2458 server_hostname=sni_name, session=session) as s:
2459 s.connect((HOST, server.port))
2460 for arg in [indata, bytearray(indata), memoryview(indata)]:
2461 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002462 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002463 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002464 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002465 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002466 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002467 if connectionchatty:
2468 if support.verbose:
2469 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002470 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002472 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2473 % (outdata[:20], len(outdata),
2474 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 s.write(b"over\n")
2476 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002477 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002478 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002479 stats.update({
2480 'compression': s.compression(),
2481 'cipher': s.cipher(),
2482 'peercert': s.getpeercert(),
2483 'client_alpn_protocol': s.selected_alpn_protocol(),
2484 'client_npn_protocol': s.selected_npn_protocol(),
2485 'version': s.version(),
2486 'session_reused': s.session_reused,
2487 'session': s.session,
2488 })
2489 s.close()
2490 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2491 stats['server_npn_protocols'] = server.selected_npn_protocols
2492 stats['server_shared_ciphers'] = server.shared_ciphers
2493 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002494
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002495def try_protocol_combo(server_protocol, client_protocol, expect_success,
2496 certsreqs=None, server_options=0, client_options=0):
2497 """
2498 Try to SSL-connect using *client_protocol* to *server_protocol*.
2499 If *expect_success* is true, assert that the connection succeeds,
2500 if it's false, assert that the connection fails.
2501 Also, if *expect_success* is a string, assert that it is the protocol
2502 version actually used by the connection.
2503 """
2504 if certsreqs is None:
2505 certsreqs = ssl.CERT_NONE
2506 certtype = {
2507 ssl.CERT_NONE: "CERT_NONE",
2508 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2509 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2510 }[certsreqs]
2511 if support.verbose:
2512 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2513 sys.stdout.write(formatstr %
2514 (ssl.get_protocol_name(client_protocol),
2515 ssl.get_protocol_name(server_protocol),
2516 certtype))
2517 client_context = ssl.SSLContext(client_protocol)
2518 client_context.options |= client_options
2519 server_context = ssl.SSLContext(server_protocol)
2520 server_context.options |= server_options
2521
2522 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2523 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2524 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002525 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002526 client_context.set_ciphers("ALL")
2527
2528 for ctx in (client_context, server_context):
2529 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002530 ctx.load_cert_chain(SIGNED_CERTFILE)
2531 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002532 try:
2533 stats = server_params_test(client_context, server_context,
2534 chatty=False, connectionchatty=False)
2535 # Protocol mismatch can result in either an SSLError, or a
2536 # "Connection reset by peer" error.
2537 except ssl.SSLError:
2538 if expect_success:
2539 raise
2540 except OSError as e:
2541 if expect_success or e.errno != errno.ECONNRESET:
2542 raise
2543 else:
2544 if not expect_success:
2545 raise AssertionError(
2546 "Client protocol %s succeeded with server protocol %s!"
2547 % (ssl.get_protocol_name(client_protocol),
2548 ssl.get_protocol_name(server_protocol)))
2549 elif (expect_success is not True
2550 and expect_success != stats['version']):
2551 raise AssertionError("version mismatch: expected %r, got %r"
2552 % (expect_success, stats['version']))
2553
2554
2555class ThreadedTests(unittest.TestCase):
2556
2557 @skip_if_broken_ubuntu_ssl
2558 def test_echo(self):
2559 """Basic test of an SSL client connecting to a server"""
2560 if support.verbose:
2561 sys.stdout.write("\n")
2562 for protocol in PROTOCOLS:
2563 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2564 continue
2565 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2566 context = ssl.SSLContext(protocol)
2567 context.load_cert_chain(CERTFILE)
2568 server_params_test(context, context,
2569 chatty=True, connectionchatty=True)
2570
Christian Heimesa170fa12017-09-15 20:27:30 +02002571 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002572
2573 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2574 server_params_test(client_context=client_context,
2575 server_context=server_context,
2576 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002577 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002578
2579 client_context.check_hostname = False
2580 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2581 with self.assertRaises(ssl.SSLError) as e:
2582 server_params_test(client_context=server_context,
2583 server_context=client_context,
2584 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002585 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 self.assertIn('called a function you should not call',
2587 str(e.exception))
2588
2589 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2590 with self.assertRaises(ssl.SSLError) as e:
2591 server_params_test(client_context=server_context,
2592 server_context=server_context,
2593 chatty=True, connectionchatty=True)
2594 self.assertIn('called a function you should not call',
2595 str(e.exception))
2596
2597 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2598 with self.assertRaises(ssl.SSLError) as e:
2599 server_params_test(client_context=server_context,
2600 server_context=client_context,
2601 chatty=True, connectionchatty=True)
2602 self.assertIn('called a function you should not call',
2603 str(e.exception))
2604
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002605 def test_getpeercert(self):
2606 if support.verbose:
2607 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002608
2609 client_context, server_context, hostname = testing_context()
2610 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002611 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002612 with client_context.wrap_socket(socket.socket(),
2613 do_handshake_on_connect=False,
2614 server_hostname=hostname) as s:
2615 s.connect((HOST, server.port))
2616 # getpeercert() raise ValueError while the handshake isn't
2617 # done.
2618 with self.assertRaises(ValueError):
2619 s.getpeercert()
2620 s.do_handshake()
2621 cert = s.getpeercert()
2622 self.assertTrue(cert, "Can't get peer certificate.")
2623 cipher = s.cipher()
2624 if support.verbose:
2625 sys.stdout.write(pprint.pformat(cert) + '\n')
2626 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2627 if 'subject' not in cert:
2628 self.fail("No subject field in certificate: %s." %
2629 pprint.pformat(cert))
2630 if ((('organizationName', 'Python Software Foundation'),)
2631 not in cert['subject']):
2632 self.fail(
2633 "Missing or invalid 'organizationName' field in certificate subject; "
2634 "should be 'Python Software Foundation'.")
2635 self.assertIn('notBefore', cert)
2636 self.assertIn('notAfter', cert)
2637 before = ssl.cert_time_to_seconds(cert['notBefore'])
2638 after = ssl.cert_time_to_seconds(cert['notAfter'])
2639 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002640
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002641 @unittest.skipUnless(have_verify_flags(),
2642 "verify_flags need OpenSSL > 0.9.8")
2643 def test_crl_check(self):
2644 if support.verbose:
2645 sys.stdout.write("\n")
2646
Christian Heimesa170fa12017-09-15 20:27:30 +02002647 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002648
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002649 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002650 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002651
2652 # VERIFY_DEFAULT should pass
2653 server = ThreadedEchoServer(context=server_context, chatty=True)
2654 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002655 with client_context.wrap_socket(socket.socket(),
2656 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002657 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002658 cert = s.getpeercert()
2659 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002660
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002661 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002662 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002663
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002664 server = ThreadedEchoServer(context=server_context, chatty=True)
2665 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002666 with client_context.wrap_socket(socket.socket(),
2667 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002668 with self.assertRaisesRegex(ssl.SSLError,
2669 "certificate verify failed"):
2670 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002671
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002673 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002674
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675 server = ThreadedEchoServer(context=server_context, chatty=True)
2676 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002677 with client_context.wrap_socket(socket.socket(),
2678 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002679 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002680 cert = s.getpeercert()
2681 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002682
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002683 def test_check_hostname(self):
2684 if support.verbose:
2685 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002686
Christian Heimesa170fa12017-09-15 20:27:30 +02002687 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002688
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002689 # correct hostname should verify
2690 server = ThreadedEchoServer(context=server_context, chatty=True)
2691 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002692 with client_context.wrap_socket(socket.socket(),
2693 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002694 s.connect((HOST, server.port))
2695 cert = s.getpeercert()
2696 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002697
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 # incorrect hostname should raise an exception
2699 server = ThreadedEchoServer(context=server_context, chatty=True)
2700 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002701 with client_context.wrap_socket(socket.socket(),
2702 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002703 with self.assertRaisesRegex(
2704 ssl.CertificateError,
2705 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002707
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002708 # missing server_hostname arg should cause an exception, too
2709 server = ThreadedEchoServer(context=server_context, chatty=True)
2710 with server:
2711 with socket.socket() as s:
2712 with self.assertRaisesRegex(ValueError,
2713 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002714 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002715
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002716 def test_ecc_cert(self):
2717 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2718 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002719 client_context.set_ciphers(
2720 'TLS13-AES-128-GCM-SHA256:TLS13-CHACHA20-POLY1305-SHA256:'
2721 'ECDHE:ECDSA:!NULL:!aRSA'
2722 )
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002723 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2724
2725 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2726 # load ECC cert
2727 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2728
2729 # correct hostname should verify
2730 server = ThreadedEchoServer(context=server_context, chatty=True)
2731 with server:
2732 with client_context.wrap_socket(socket.socket(),
2733 server_hostname=hostname) as s:
2734 s.connect((HOST, server.port))
2735 cert = s.getpeercert()
2736 self.assertTrue(cert, "Can't get peer certificate.")
2737 cipher = s.cipher()[0].split('-')
2738 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2739
2740 def test_dual_rsa_ecc(self):
2741 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2742 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002743 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2744 # algorithms.
2745 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002746 # only ECDSA certs
2747 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2748 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2749
2750 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2751 # load ECC and RSA key/cert pairs
2752 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2753 server_context.load_cert_chain(SIGNED_CERTFILE)
2754
2755 # correct hostname should verify
2756 server = ThreadedEchoServer(context=server_context, chatty=True)
2757 with server:
2758 with client_context.wrap_socket(socket.socket(),
2759 server_hostname=hostname) as s:
2760 s.connect((HOST, server.port))
2761 cert = s.getpeercert()
2762 self.assertTrue(cert, "Can't get peer certificate.")
2763 cipher = s.cipher()[0].split('-')
2764 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2765
Christian Heimes66e57422018-01-29 14:25:13 +01002766 def test_check_hostname_idn(self):
2767 if support.verbose:
2768 sys.stdout.write("\n")
2769
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002770 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002771 server_context.load_cert_chain(IDNSANSFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002772 # TODO: fix TLSv1.3 support
2773 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimes66e57422018-01-29 14:25:13 +01002774
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002775 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002776 context.verify_mode = ssl.CERT_REQUIRED
2777 context.check_hostname = True
2778 context.load_verify_locations(SIGNING_CA)
2779
2780 # correct hostname should verify, when specified in several
2781 # different ways
2782 idn_hostnames = [
2783 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002784 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002785 ('xn--knig-5qa.idn.pythontest.net',
2786 'xn--knig-5qa.idn.pythontest.net'),
2787 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002788 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002789
2790 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002791 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002792 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2793 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2794 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002795 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2796
2797 # ('königsgäßchen.idna2008.pythontest.net',
2798 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2799 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2800 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2801 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2802 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2803
Christian Heimes66e57422018-01-29 14:25:13 +01002804 ]
2805 for server_hostname, expected_hostname in idn_hostnames:
2806 server = ThreadedEchoServer(context=server_context, chatty=True)
2807 with server:
2808 with context.wrap_socket(socket.socket(),
2809 server_hostname=server_hostname) as s:
2810 self.assertEqual(s.server_hostname, expected_hostname)
2811 s.connect((HOST, server.port))
2812 cert = s.getpeercert()
2813 self.assertEqual(s.server_hostname, expected_hostname)
2814 self.assertTrue(cert, "Can't get peer certificate.")
2815
Christian Heimes66e57422018-01-29 14:25:13 +01002816 # incorrect hostname should raise an exception
2817 server = ThreadedEchoServer(context=server_context, chatty=True)
2818 with server:
2819 with context.wrap_socket(socket.socket(),
2820 server_hostname="python.example.org") as s:
2821 with self.assertRaises(ssl.CertificateError):
2822 s.connect((HOST, server.port))
2823
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002824 def test_wrong_cert(self):
2825 """Connecting when the server rejects the client's certificate
2826
2827 Launch a server with CERT_REQUIRED, and check that trying to
2828 connect to it with a wrong client certificate fails.
2829 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002830 client_context, server_context, hostname = testing_context()
2831 # load client cert
2832 client_context.load_cert_chain(WRONG_CERT)
2833 # require TLS client authentication
2834 server_context.verify_mode = ssl.CERT_REQUIRED
2835 # TODO: fix TLSv1.3 support
2836 # With TLS 1.3, test fails with exception in server thread
2837 server_context.options |= ssl.OP_NO_TLSv1_3
2838
2839 server = ThreadedEchoServer(
2840 context=server_context, chatty=True, connectionchatty=True,
2841 )
2842
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002843 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002844 client_context.wrap_socket(socket.socket(),
2845 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002846 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002847 # Expect either an SSL error about the server rejecting
2848 # the connection, or a low-level connection reset (which
2849 # sometimes happens on Windows)
2850 s.connect((HOST, server.port))
2851 except ssl.SSLError as e:
2852 if support.verbose:
2853 sys.stdout.write("\nSSLError is %r\n" % e)
2854 except OSError as e:
2855 if e.errno != errno.ECONNRESET:
2856 raise
2857 if support.verbose:
2858 sys.stdout.write("\nsocket.error is %r\n" % e)
2859 else:
2860 self.fail("Use of invalid cert should have failed!")
2861
2862 def test_rude_shutdown(self):
2863 """A brutal shutdown of an SSL server should raise an OSError
2864 in the client when attempting handshake.
2865 """
2866 listener_ready = threading.Event()
2867 listener_gone = threading.Event()
2868
2869 s = socket.socket()
2870 port = support.bind_port(s, HOST)
2871
2872 # `listener` runs in a thread. It sits in an accept() until
2873 # the main thread connects. Then it rudely closes the socket,
2874 # and sets Event `listener_gone` to let the main thread know
2875 # the socket is gone.
2876 def listener():
2877 s.listen()
2878 listener_ready.set()
2879 newsock, addr = s.accept()
2880 newsock.close()
2881 s.close()
2882 listener_gone.set()
2883
2884 def connector():
2885 listener_ready.wait()
2886 with socket.socket() as c:
2887 c.connect((HOST, port))
2888 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002889 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002890 ssl_sock = test_wrap_socket(c)
2891 except OSError:
2892 pass
2893 else:
2894 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002895
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002896 t = threading.Thread(target=listener)
2897 t.start()
2898 try:
2899 connector()
2900 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002901 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002902
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002903 def test_ssl_cert_verify_error(self):
2904 if support.verbose:
2905 sys.stdout.write("\n")
2906
2907 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2908 server_context.load_cert_chain(SIGNED_CERTFILE)
2909
2910 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2911
2912 server = ThreadedEchoServer(context=server_context, chatty=True)
2913 with server:
2914 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002915 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002916 try:
2917 s.connect((HOST, server.port))
2918 except ssl.SSLError as e:
2919 msg = 'unable to get local issuer certificate'
2920 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2921 self.assertEqual(e.verify_code, 20)
2922 self.assertEqual(e.verify_message, msg)
2923 self.assertIn(msg, repr(e))
2924 self.assertIn('certificate verify failed', repr(e))
2925
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002926 @skip_if_broken_ubuntu_ssl
2927 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2928 "OpenSSL is compiled without SSLv2 support")
2929 def test_protocol_sslv2(self):
2930 """Connecting to an SSLv2 server with various client options"""
2931 if support.verbose:
2932 sys.stdout.write("\n")
2933 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2934 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2935 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002936 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2938 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2939 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2940 # SSLv23 client with specific SSL options
2941 if no_sslv2_implies_sslv3_hello():
2942 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002943 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002944 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002945 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002946 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002947 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002948 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002949
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002950 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002951 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002952 """Connecting to an SSLv23 server with various client options"""
2953 if support.verbose:
2954 sys.stdout.write("\n")
2955 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002956 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002957 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002958 except OSError as x:
2959 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2960 if support.verbose:
2961 sys.stdout.write(
2962 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2963 % str(x))
2964 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002965 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2966 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2967 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002968
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002969 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002970 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2971 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2972 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973
2974 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002975 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2976 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2977 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002978
2979 # Server with specific SSL options
2980 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002981 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002982 server_options=ssl.OP_NO_SSLv3)
2983 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002984 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002985 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002986 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002987 server_options=ssl.OP_NO_TLSv1)
2988
2989
2990 @skip_if_broken_ubuntu_ssl
2991 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2992 "OpenSSL is compiled without SSLv3 support")
2993 def test_protocol_sslv3(self):
2994 """Connecting to an SSLv3 server with various client options"""
2995 if support.verbose:
2996 sys.stdout.write("\n")
2997 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2998 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2999 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3000 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3001 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003002 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003003 client_options=ssl.OP_NO_SSLv3)
3004 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3005 if no_sslv2_implies_sslv3_hello():
3006 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003007 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003008 False, client_options=ssl.OP_NO_SSLv2)
3009
3010 @skip_if_broken_ubuntu_ssl
3011 def test_protocol_tlsv1(self):
3012 """Connecting to a TLSv1 server with various client options"""
3013 if support.verbose:
3014 sys.stdout.write("\n")
3015 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3016 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3017 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3018 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3019 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3020 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3021 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003022 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003023 client_options=ssl.OP_NO_TLSv1)
3024
3025 @skip_if_broken_ubuntu_ssl
3026 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3027 "TLS version 1.1 not supported.")
3028 def test_protocol_tlsv1_1(self):
3029 """Connecting to a TLSv1.1 server with various client options.
3030 Testing against older TLS versions."""
3031 if support.verbose:
3032 sys.stdout.write("\n")
3033 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3034 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3035 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3036 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3037 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003038 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003039 client_options=ssl.OP_NO_TLSv1_1)
3040
Christian Heimesa170fa12017-09-15 20:27:30 +02003041 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003042 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3043 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3044
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003045 @skip_if_broken_ubuntu_ssl
3046 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3047 "TLS version 1.2 not supported.")
3048 def test_protocol_tlsv1_2(self):
3049 """Connecting to a TLSv1.2 server with various client options.
3050 Testing against older TLS versions."""
3051 if support.verbose:
3052 sys.stdout.write("\n")
3053 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3054 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3055 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3056 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3057 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3058 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3059 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003060 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003061 client_options=ssl.OP_NO_TLSv1_2)
3062
Christian Heimesa170fa12017-09-15 20:27:30 +02003063 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003064 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3065 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3066 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3067 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3068
3069 def test_starttls(self):
3070 """Switching from clear text to encrypted and back again."""
3071 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3072
3073 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003074 starttls_server=True,
3075 chatty=True,
3076 connectionchatty=True)
3077 wrapped = False
3078 with server:
3079 s = socket.socket()
3080 s.setblocking(1)
3081 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003082 if support.verbose:
3083 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003084 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003085 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 sys.stdout.write(
3087 " client: sending %r...\n" % indata)
3088 if wrapped:
3089 conn.write(indata)
3090 outdata = conn.read()
3091 else:
3092 s.send(indata)
3093 outdata = s.recv(1024)
3094 msg = outdata.strip().lower()
3095 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3096 # STARTTLS ok, switch to secure mode
3097 if support.verbose:
3098 sys.stdout.write(
3099 " client: read %r from server, starting TLS...\n"
3100 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003101 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003102 wrapped = True
3103 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3104 # ENDTLS ok, switch back to clear text
3105 if support.verbose:
3106 sys.stdout.write(
3107 " client: read %r from server, ending TLS...\n"
3108 % msg)
3109 s = conn.unwrap()
3110 wrapped = False
3111 else:
3112 if support.verbose:
3113 sys.stdout.write(
3114 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003115 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003116 sys.stdout.write(" client: closing connection.\n")
3117 if wrapped:
3118 conn.write(b"over\n")
3119 else:
3120 s.send(b"over\n")
3121 if wrapped:
3122 conn.close()
3123 else:
3124 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003125
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003126 def test_socketserver(self):
3127 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003128 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003129 # try to connect
3130 if support.verbose:
3131 sys.stdout.write('\n')
3132 with open(CERTFILE, 'rb') as f:
3133 d1 = f.read()
3134 d2 = ''
3135 # now fetch the same data from the HTTPS server
3136 url = 'https://localhost:%d/%s' % (
3137 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003138 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003139 f = urllib.request.urlopen(url, context=context)
3140 try:
3141 dlen = f.info().get("content-length")
3142 if dlen and (int(dlen) > 0):
3143 d2 = f.read(int(dlen))
3144 if support.verbose:
3145 sys.stdout.write(
3146 " client: read %d bytes from remote server '%s'\n"
3147 % (len(d2), server))
3148 finally:
3149 f.close()
3150 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003151
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003152 def test_asyncore_server(self):
3153 """Check the example asyncore integration."""
3154 if support.verbose:
3155 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003156
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003157 indata = b"FOO\n"
3158 server = AsyncoreEchoServer(CERTFILE)
3159 with server:
3160 s = test_wrap_socket(socket.socket())
3161 s.connect(('127.0.0.1', server.port))
3162 if support.verbose:
3163 sys.stdout.write(
3164 " client: sending %r...\n" % indata)
3165 s.write(indata)
3166 outdata = s.read()
3167 if support.verbose:
3168 sys.stdout.write(" client: read %r\n" % outdata)
3169 if outdata != indata.lower():
3170 self.fail(
3171 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3172 % (outdata[:20], len(outdata),
3173 indata[:20].lower(), len(indata)))
3174 s.write(b"over\n")
3175 if support.verbose:
3176 sys.stdout.write(" client: closing connection.\n")
3177 s.close()
3178 if support.verbose:
3179 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003180
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003181 def test_recv_send(self):
3182 """Test recv(), send() and friends."""
3183 if support.verbose:
3184 sys.stdout.write("\n")
3185
3186 server = ThreadedEchoServer(CERTFILE,
3187 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003188 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003189 cacerts=CERTFILE,
3190 chatty=True,
3191 connectionchatty=False)
3192 with server:
3193 s = test_wrap_socket(socket.socket(),
3194 server_side=False,
3195 certfile=CERTFILE,
3196 ca_certs=CERTFILE,
3197 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003198 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199 s.connect((HOST, server.port))
3200 # helper methods for standardising recv* method signatures
3201 def _recv_into():
3202 b = bytearray(b"\0"*100)
3203 count = s.recv_into(b)
3204 return b[:count]
3205
3206 def _recvfrom_into():
3207 b = bytearray(b"\0"*100)
3208 count, addr = s.recvfrom_into(b)
3209 return b[:count]
3210
3211 # (name, method, expect success?, *args, return value func)
3212 send_methods = [
3213 ('send', s.send, True, [], len),
3214 ('sendto', s.sendto, False, ["some.address"], len),
3215 ('sendall', s.sendall, True, [], lambda x: None),
3216 ]
3217 # (name, method, whether to expect success, *args)
3218 recv_methods = [
3219 ('recv', s.recv, True, []),
3220 ('recvfrom', s.recvfrom, False, ["some.address"]),
3221 ('recv_into', _recv_into, True, []),
3222 ('recvfrom_into', _recvfrom_into, False, []),
3223 ]
3224 data_prefix = "PREFIX_"
3225
3226 for (meth_name, send_meth, expect_success, args,
3227 ret_val_meth) in send_methods:
3228 indata = (data_prefix + meth_name).encode('ascii')
3229 try:
3230 ret = send_meth(indata, *args)
3231 msg = "sending with {}".format(meth_name)
3232 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3233 outdata = s.read()
3234 if outdata != indata.lower():
3235 self.fail(
3236 "While sending with <<{name:s}>> bad data "
3237 "<<{outdata:r}>> ({nout:d}) received; "
3238 "expected <<{indata:r}>> ({nin:d})\n".format(
3239 name=meth_name, outdata=outdata[:20],
3240 nout=len(outdata),
3241 indata=indata[:20], nin=len(indata)
3242 )
3243 )
3244 except ValueError as e:
3245 if expect_success:
3246 self.fail(
3247 "Failed to send with method <<{name:s}>>; "
3248 "expected to succeed.\n".format(name=meth_name)
3249 )
3250 if not str(e).startswith(meth_name):
3251 self.fail(
3252 "Method <<{name:s}>> failed with unexpected "
3253 "exception message: {exp:s}\n".format(
3254 name=meth_name, exp=e
3255 )
3256 )
3257
3258 for meth_name, recv_meth, expect_success, args in recv_methods:
3259 indata = (data_prefix + meth_name).encode('ascii')
3260 try:
3261 s.send(indata)
3262 outdata = recv_meth(*args)
3263 if outdata != indata.lower():
3264 self.fail(
3265 "While receiving with <<{name:s}>> bad data "
3266 "<<{outdata:r}>> ({nout:d}) received; "
3267 "expected <<{indata:r}>> ({nin:d})\n".format(
3268 name=meth_name, outdata=outdata[:20],
3269 nout=len(outdata),
3270 indata=indata[:20], nin=len(indata)
3271 )
3272 )
3273 except ValueError as e:
3274 if expect_success:
3275 self.fail(
3276 "Failed to receive with method <<{name:s}>>; "
3277 "expected to succeed.\n".format(name=meth_name)
3278 )
3279 if not str(e).startswith(meth_name):
3280 self.fail(
3281 "Method <<{name:s}>> failed with unexpected "
3282 "exception message: {exp:s}\n".format(
3283 name=meth_name, exp=e
3284 )
3285 )
3286 # consume data
3287 s.read()
3288
3289 # read(-1, buffer) is supported, even though read(-1) is not
3290 data = b"data"
3291 s.send(data)
3292 buffer = bytearray(len(data))
3293 self.assertEqual(s.read(-1, buffer), len(data))
3294 self.assertEqual(buffer, data)
3295
Christian Heimes888bbdc2017-09-07 14:18:21 -07003296 # sendall accepts bytes-like objects
3297 if ctypes is not None:
3298 ubyte = ctypes.c_ubyte * len(data)
3299 byteslike = ubyte.from_buffer_copy(data)
3300 s.sendall(byteslike)
3301 self.assertEqual(s.read(), data)
3302
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003303 # Make sure sendmsg et al are disallowed to avoid
3304 # inadvertent disclosure of data and/or corruption
3305 # of the encrypted data stream
3306 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3307 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3308 self.assertRaises(NotImplementedError,
3309 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003310 s.write(b"over\n")
3311
3312 self.assertRaises(ValueError, s.recv, -1)
3313 self.assertRaises(ValueError, s.read, -1)
3314
3315 s.close()
3316
3317 def test_recv_zero(self):
3318 server = ThreadedEchoServer(CERTFILE)
3319 server.__enter__()
3320 self.addCleanup(server.__exit__, None, None)
3321 s = socket.create_connection((HOST, server.port))
3322 self.addCleanup(s.close)
3323 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3324 self.addCleanup(s.close)
3325
3326 # recv/read(0) should return no data
3327 s.send(b"data")
3328 self.assertEqual(s.recv(0), b"")
3329 self.assertEqual(s.read(0), b"")
3330 self.assertEqual(s.read(), b"data")
3331
3332 # Should not block if the other end sends no data
3333 s.setblocking(False)
3334 self.assertEqual(s.recv(0), b"")
3335 self.assertEqual(s.recv_into(bytearray()), 0)
3336
3337 def test_nonblocking_send(self):
3338 server = ThreadedEchoServer(CERTFILE,
3339 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003340 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003341 cacerts=CERTFILE,
3342 chatty=True,
3343 connectionchatty=False)
3344 with server:
3345 s = test_wrap_socket(socket.socket(),
3346 server_side=False,
3347 certfile=CERTFILE,
3348 ca_certs=CERTFILE,
3349 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003350 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003351 s.connect((HOST, server.port))
3352 s.setblocking(False)
3353
3354 # If we keep sending data, at some point the buffers
3355 # will be full and the call will block
3356 buf = bytearray(8192)
3357 def fill_buffer():
3358 while True:
3359 s.send(buf)
3360 self.assertRaises((ssl.SSLWantWriteError,
3361 ssl.SSLWantReadError), fill_buffer)
3362
3363 # Now read all the output and discard it
3364 s.setblocking(True)
3365 s.close()
3366
3367 def test_handshake_timeout(self):
3368 # Issue #5103: SSL handshake must respect the socket timeout
3369 server = socket.socket(socket.AF_INET)
3370 host = "127.0.0.1"
3371 port = support.bind_port(server)
3372 started = threading.Event()
3373 finish = False
3374
3375 def serve():
3376 server.listen()
3377 started.set()
3378 conns = []
3379 while not finish:
3380 r, w, e = select.select([server], [], [], 0.1)
3381 if server in r:
3382 # Let the socket hang around rather than having
3383 # it closed by garbage collection.
3384 conns.append(server.accept()[0])
3385 for sock in conns:
3386 sock.close()
3387
3388 t = threading.Thread(target=serve)
3389 t.start()
3390 started.wait()
3391
3392 try:
3393 try:
3394 c = socket.socket(socket.AF_INET)
3395 c.settimeout(0.2)
3396 c.connect((host, port))
3397 # Will attempt handshake and time out
3398 self.assertRaisesRegex(socket.timeout, "timed out",
3399 test_wrap_socket, c)
3400 finally:
3401 c.close()
3402 try:
3403 c = socket.socket(socket.AF_INET)
3404 c = test_wrap_socket(c)
3405 c.settimeout(0.2)
3406 # Will attempt handshake and time out
3407 self.assertRaisesRegex(socket.timeout, "timed out",
3408 c.connect, (host, port))
3409 finally:
3410 c.close()
3411 finally:
3412 finish = True
3413 t.join()
3414 server.close()
3415
3416 def test_server_accept(self):
3417 # Issue #16357: accept() on a SSLSocket created through
3418 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003419 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003420 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003421 context.load_verify_locations(SIGNING_CA)
3422 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003423 server = socket.socket(socket.AF_INET)
3424 host = "127.0.0.1"
3425 port = support.bind_port(server)
3426 server = context.wrap_socket(server, server_side=True)
3427 self.assertTrue(server.server_side)
3428
3429 evt = threading.Event()
3430 remote = None
3431 peer = None
3432 def serve():
3433 nonlocal remote, peer
3434 server.listen()
3435 # Block on the accept and wait on the connection to close.
3436 evt.set()
3437 remote, peer = server.accept()
3438 remote.recv(1)
3439
3440 t = threading.Thread(target=serve)
3441 t.start()
3442 # Client wait until server setup and perform a connect.
3443 evt.wait()
3444 client = context.wrap_socket(socket.socket())
3445 client.connect((host, port))
3446 client_addr = client.getsockname()
3447 client.close()
3448 t.join()
3449 remote.close()
3450 server.close()
3451 # Sanity checks.
3452 self.assertIsInstance(remote, ssl.SSLSocket)
3453 self.assertEqual(peer, client_addr)
3454
3455 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003456 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003457 with context.wrap_socket(socket.socket()) as sock:
3458 with self.assertRaises(OSError) as cm:
3459 sock.getpeercert()
3460 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3461
3462 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003463 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003464 with context.wrap_socket(socket.socket()) as sock:
3465 with self.assertRaises(OSError) as cm:
3466 sock.do_handshake()
3467 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3468
3469 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003470 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003471 try:
3472 # Force a set of weak ciphers on our client context
3473 context.set_ciphers("DES")
3474 except ssl.SSLError:
3475 self.skipTest("no DES cipher available")
3476 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003477 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003478 chatty=False) as server:
3479 with context.wrap_socket(socket.socket()) as s:
3480 with self.assertRaises(OSError):
3481 s.connect((HOST, server.port))
3482 self.assertIn("no shared cipher", server.conn_errors[0])
3483
3484 def test_version_basic(self):
3485 """
3486 Basic tests for SSLSocket.version().
3487 More tests are done in the test_protocol_*() methods.
3488 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003489 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3490 context.check_hostname = False
3491 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003492 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003493 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003494 chatty=False) as server:
3495 with context.wrap_socket(socket.socket()) as s:
3496 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003497 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003498 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003499 if ssl.OPENSSL_VERSION_INFO >= (1, 1, 1):
3500 self.assertEqual(s.version(), 'TLSv1.3')
3501 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003502 self.assertEqual(s.version(), 'TLSv1.2')
3503 else: # 0.9.8 to 1.0.1
3504 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003505 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003506 self.assertIs(s.version(), None)
3507
Christian Heimescb5b68a2017-09-07 18:07:00 -07003508 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3509 "test requires TLSv1.3 enabled OpenSSL")
3510 def test_tls1_3(self):
3511 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3512 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003513 context.options |= (
3514 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3515 )
3516 with ThreadedEchoServer(context=context) as server:
3517 with context.wrap_socket(socket.socket()) as s:
3518 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003519 self.assertIn(s.cipher()[0], {
Christian Heimescb5b68a2017-09-07 18:07:00 -07003520 'TLS13-AES-256-GCM-SHA384',
3521 'TLS13-CHACHA20-POLY1305-SHA256',
3522 'TLS13-AES-128-GCM-SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003523 })
3524 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003525
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003526 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3527 "required OpenSSL 1.1.0g")
3528 def test_min_max_version(self):
3529 client_context, server_context, hostname = testing_context()
3530 # client TLSv1.0 to 1.2
3531 client_context.minimum_version = ssl.TLSVersion.TLSv1
3532 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3533 # server only TLSv1.2
3534 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3535 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3536
3537 with ThreadedEchoServer(context=server_context) as server:
3538 with client_context.wrap_socket(socket.socket(),
3539 server_hostname=hostname) as s:
3540 s.connect((HOST, server.port))
3541 self.assertEqual(s.version(), 'TLSv1.2')
3542
3543 # client 1.0 to 1.2, server 1.0 to 1.1
3544 server_context.minimum_version = ssl.TLSVersion.TLSv1
3545 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3546
3547 with ThreadedEchoServer(context=server_context) as server:
3548 with client_context.wrap_socket(socket.socket(),
3549 server_hostname=hostname) as s:
3550 s.connect((HOST, server.port))
3551 self.assertEqual(s.version(), 'TLSv1.1')
3552
3553 # client 1.0, server 1.2 (mismatch)
3554 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3555 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3556 client_context.minimum_version = ssl.TLSVersion.TLSv1
3557 client_context.maximum_version = ssl.TLSVersion.TLSv1
3558 with ThreadedEchoServer(context=server_context) as server:
3559 with client_context.wrap_socket(socket.socket(),
3560 server_hostname=hostname) as s:
3561 with self.assertRaises(ssl.SSLError) as e:
3562 s.connect((HOST, server.port))
3563 self.assertIn("alert", str(e.exception))
3564
3565
3566 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3567 "required OpenSSL 1.1.0g")
3568 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3569 def test_min_max_version_sslv3(self):
3570 client_context, server_context, hostname = testing_context()
3571 server_context.minimum_version = ssl.TLSVersion.SSLv3
3572 client_context.minimum_version = ssl.TLSVersion.SSLv3
3573 client_context.maximum_version = ssl.TLSVersion.SSLv3
3574 with ThreadedEchoServer(context=server_context) as server:
3575 with client_context.wrap_socket(socket.socket(),
3576 server_hostname=hostname) as s:
3577 s.connect((HOST, server.port))
3578 self.assertEqual(s.version(), 'SSLv3')
3579
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003580 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3581 def test_default_ecdh_curve(self):
3582 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3583 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003584 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003585 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003586 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3587 # cipher name.
3588 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003589 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3590 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3591 # our default cipher list should prefer ECDH-based ciphers
3592 # automatically.
3593 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3594 context.set_ciphers("ECCdraft:ECDH")
3595 with ThreadedEchoServer(context=context) as server:
3596 with context.wrap_socket(socket.socket()) as s:
3597 s.connect((HOST, server.port))
3598 self.assertIn("ECDH", s.cipher()[0])
3599
3600 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3601 "'tls-unique' channel binding not available")
3602 def test_tls_unique_channel_binding(self):
3603 """Test tls-unique channel binding."""
3604 if support.verbose:
3605 sys.stdout.write("\n")
3606
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003607 client_context, server_context, hostname = testing_context()
3608 # TODO: fix TLSv1.3 support
3609 client_context.options |= ssl.OP_NO_TLSv1_3
3610
3611 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003612 chatty=True,
3613 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003614
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003615 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003616 with client_context.wrap_socket(
3617 socket.socket(),
3618 server_hostname=hostname) as s:
3619 s.connect((HOST, server.port))
3620 # get the data
3621 cb_data = s.get_channel_binding("tls-unique")
3622 if support.verbose:
3623 sys.stdout.write(
3624 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003625
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003626 # check if it is sane
3627 self.assertIsNotNone(cb_data)
3628 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003629
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003630 # and compare with the peers version
3631 s.write(b"CB tls-unique\n")
3632 peer_data_repr = s.read().strip()
3633 self.assertEqual(peer_data_repr,
3634 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003635
3636 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003637 with client_context.wrap_socket(
3638 socket.socket(),
3639 server_hostname=hostname) as s:
3640 s.connect((HOST, server.port))
3641 new_cb_data = s.get_channel_binding("tls-unique")
3642 if support.verbose:
3643 sys.stdout.write(
3644 "got another channel binding data: {0!r}\n".format(
3645 new_cb_data)
3646 )
3647 # is it really unique
3648 self.assertNotEqual(cb_data, new_cb_data)
3649 self.assertIsNotNone(cb_data)
3650 self.assertEqual(len(cb_data), 12) # True for TLSv1
3651 s.write(b"CB tls-unique\n")
3652 peer_data_repr = s.read().strip()
3653 self.assertEqual(peer_data_repr,
3654 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003655
3656 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003657 client_context, server_context, hostname = testing_context()
3658 stats = server_params_test(client_context, server_context,
3659 chatty=True, connectionchatty=True,
3660 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003661 if support.verbose:
3662 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3663 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3664
3665 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3666 "ssl.OP_NO_COMPRESSION needed for this test")
3667 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003668 client_context, server_context, hostname = testing_context()
3669 client_context.options |= ssl.OP_NO_COMPRESSION
3670 server_context.options |= ssl.OP_NO_COMPRESSION
3671 stats = server_params_test(client_context, server_context,
3672 chatty=True, connectionchatty=True,
3673 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003674 self.assertIs(stats['compression'], None)
3675
3676 def test_dh_params(self):
3677 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003678 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003679 # test scenario needs TLS <= 1.2
3680 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003681 server_context.load_dh_params(DHFILE)
3682 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003683 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003684 stats = server_params_test(client_context, server_context,
3685 chatty=True, connectionchatty=True,
3686 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003687 cipher = stats["cipher"][0]
3688 parts = cipher.split("-")
3689 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3690 self.fail("Non-DH cipher: " + cipher[0])
3691
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003692 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003693 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003694 def test_ecdh_curve(self):
3695 # server secp384r1, client auto
3696 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003697
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003698 server_context.set_ecdh_curve("secp384r1")
3699 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3700 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3701 stats = server_params_test(client_context, server_context,
3702 chatty=True, connectionchatty=True,
3703 sni_name=hostname)
3704
3705 # server auto, client secp384r1
3706 client_context, server_context, hostname = testing_context()
3707 client_context.set_ecdh_curve("secp384r1")
3708 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3709 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3710 stats = server_params_test(client_context, server_context,
3711 chatty=True, connectionchatty=True,
3712 sni_name=hostname)
3713
3714 # server / client curve mismatch
3715 client_context, server_context, hostname = testing_context()
3716 client_context.set_ecdh_curve("prime256v1")
3717 server_context.set_ecdh_curve("secp384r1")
3718 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3719 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3720 try:
3721 stats = server_params_test(client_context, server_context,
3722 chatty=True, connectionchatty=True,
3723 sni_name=hostname)
3724 except ssl.SSLError:
3725 pass
3726 else:
3727 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003728 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003729 self.fail("mismatch curve did not fail")
3730
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003731 def test_selected_alpn_protocol(self):
3732 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003733 client_context, server_context, hostname = testing_context()
3734 stats = server_params_test(client_context, server_context,
3735 chatty=True, connectionchatty=True,
3736 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003737 self.assertIs(stats['client_alpn_protocol'], None)
3738
3739 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3740 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3741 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003742 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003743 server_context.set_alpn_protocols(['foo', 'bar'])
3744 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003745 chatty=True, connectionchatty=True,
3746 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003747 self.assertIs(stats['client_alpn_protocol'], None)
3748
3749 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3750 def test_alpn_protocols(self):
3751 server_protocols = ['foo', 'bar', 'milkshake']
3752 protocol_tests = [
3753 (['foo', 'bar'], 'foo'),
3754 (['bar', 'foo'], 'foo'),
3755 (['milkshake'], 'milkshake'),
3756 (['http/3.0', 'http/4.0'], None)
3757 ]
3758 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003759 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003760 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761 client_context.set_alpn_protocols(client_protocols)
3762
3763 try:
3764 stats = server_params_test(client_context,
3765 server_context,
3766 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003767 connectionchatty=True,
3768 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769 except ssl.SSLError as e:
3770 stats = e
3771
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003772 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003773 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3774 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3775 self.assertIsInstance(stats, ssl.SSLError)
3776 else:
3777 msg = "failed trying %s (s) and %s (c).\n" \
3778 "was expecting %s, but got %%s from the %%s" \
3779 % (str(server_protocols), str(client_protocols),
3780 str(expected))
3781 client_result = stats['client_alpn_protocol']
3782 self.assertEqual(client_result, expected,
3783 msg % (client_result, "client"))
3784 server_result = stats['server_alpn_protocols'][-1] \
3785 if len(stats['server_alpn_protocols']) else 'nothing'
3786 self.assertEqual(server_result, expected,
3787 msg % (server_result, "server"))
3788
3789 def test_selected_npn_protocol(self):
3790 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003791 client_context, server_context, hostname = testing_context()
3792 stats = server_params_test(client_context, server_context,
3793 chatty=True, connectionchatty=True,
3794 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003795 self.assertIs(stats['client_npn_protocol'], None)
3796
3797 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3798 def test_npn_protocols(self):
3799 server_protocols = ['http/1.1', 'spdy/2']
3800 protocol_tests = [
3801 (['http/1.1', 'spdy/2'], 'http/1.1'),
3802 (['spdy/2', 'http/1.1'], 'http/1.1'),
3803 (['spdy/2', 'test'], 'spdy/2'),
3804 (['abc', 'def'], 'abc')
3805 ]
3806 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003807 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003808 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003809 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003810 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003811 chatty=True, connectionchatty=True,
3812 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003813 msg = "failed trying %s (s) and %s (c).\n" \
3814 "was expecting %s, but got %%s from the %%s" \
3815 % (str(server_protocols), str(client_protocols),
3816 str(expected))
3817 client_result = stats['client_npn_protocol']
3818 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3819 server_result = stats['server_npn_protocols'][-1] \
3820 if len(stats['server_npn_protocols']) else 'nothing'
3821 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3822
3823 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003824 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003825 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003826 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003827 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003828 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003829 client_context.load_verify_locations(SIGNING_CA)
3830 return server_context, other_context, client_context
3831
3832 def check_common_name(self, stats, name):
3833 cert = stats['peercert']
3834 self.assertIn((('commonName', name),), cert['subject'])
3835
3836 @needs_sni
3837 def test_sni_callback(self):
3838 calls = []
3839 server_context, other_context, client_context = self.sni_contexts()
3840
Christian Heimesa170fa12017-09-15 20:27:30 +02003841 client_context.check_hostname = False
3842
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003843 def servername_cb(ssl_sock, server_name, initial_context):
3844 calls.append((server_name, initial_context))
3845 if server_name is not None:
3846 ssl_sock.context = other_context
3847 server_context.set_servername_callback(servername_cb)
3848
3849 stats = server_params_test(client_context, server_context,
3850 chatty=True,
3851 sni_name='supermessage')
3852 # The hostname was fetched properly, and the certificate was
3853 # changed for the connection.
3854 self.assertEqual(calls, [("supermessage", server_context)])
3855 # CERTFILE4 was selected
3856 self.check_common_name(stats, 'fakehostname')
3857
3858 calls = []
3859 # The callback is called with server_name=None
3860 stats = server_params_test(client_context, server_context,
3861 chatty=True,
3862 sni_name=None)
3863 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003864 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003865
3866 # Check disabling the callback
3867 calls = []
3868 server_context.set_servername_callback(None)
3869
3870 stats = server_params_test(client_context, server_context,
3871 chatty=True,
3872 sni_name='notfunny')
3873 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003874 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003875 self.assertEqual(calls, [])
3876
3877 @needs_sni
3878 def test_sni_callback_alert(self):
3879 # Returning a TLS alert is reflected to the connecting client
3880 server_context, other_context, client_context = self.sni_contexts()
3881
3882 def cb_returning_alert(ssl_sock, server_name, initial_context):
3883 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3884 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003885 with self.assertRaises(ssl.SSLError) as cm:
3886 stats = server_params_test(client_context, server_context,
3887 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003888 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003889 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003890
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003891 @needs_sni
3892 def test_sni_callback_raising(self):
3893 # Raising fails the connection with a TLS handshake failure alert.
3894 server_context, other_context, client_context = self.sni_contexts()
3895
3896 def cb_raising(ssl_sock, server_name, initial_context):
3897 1/0
3898 server_context.set_servername_callback(cb_raising)
3899
3900 with self.assertRaises(ssl.SSLError) as cm, \
3901 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003902 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003903 chatty=False,
3904 sni_name='supermessage')
3905 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3906 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003907
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003908 @needs_sni
3909 def test_sni_callback_wrong_return_type(self):
3910 # Returning the wrong return type terminates the TLS connection
3911 # with an internal error alert.
3912 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003913
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003914 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3915 return "foo"
3916 server_context.set_servername_callback(cb_wrong_return_type)
3917
3918 with self.assertRaises(ssl.SSLError) as cm, \
3919 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003920 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003921 chatty=False,
3922 sni_name='supermessage')
3923 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3924 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003925
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003926 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003927 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003928 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3929 client_context.set_ciphers("AES128:AES256")
3930 server_context.set_ciphers("AES256")
3931 alg1 = "AES256"
3932 alg2 = "AES-256"
3933 else:
3934 client_context.set_ciphers("AES:3DES")
3935 server_context.set_ciphers("3DES")
3936 alg1 = "3DES"
3937 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003938
Christian Heimesa170fa12017-09-15 20:27:30 +02003939 stats = server_params_test(client_context, server_context,
3940 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 ciphers = stats['server_shared_ciphers'][0]
3942 self.assertGreater(len(ciphers), 0)
3943 for name, tls_version, bits in ciphers:
3944 if not alg1 in name.split("-") and alg2 not in name:
3945 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003946
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003947 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003948 client_context, server_context, hostname = testing_context()
3949 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003950
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003951 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003952 s = client_context.wrap_socket(socket.socket(),
3953 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003954 s.connect((HOST, server.port))
3955 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003956
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003957 self.assertRaises(ValueError, s.read, 1024)
3958 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003959
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960 def test_sendfile(self):
3961 TEST_DATA = b"x" * 512
3962 with open(support.TESTFN, 'wb') as f:
3963 f.write(TEST_DATA)
3964 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003965 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003966 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003967 context.load_verify_locations(SIGNING_CA)
3968 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003969 server = ThreadedEchoServer(context=context, chatty=False)
3970 with server:
3971 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003972 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003973 with open(support.TESTFN, 'rb') as file:
3974 s.sendfile(file)
3975 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003976
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003977 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003978 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003979 # TODO: sessions aren't compatible with TLSv1.3 yet
3980 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003981
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003982 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003983 stats = server_params_test(client_context, server_context,
3984 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003985 session = stats['session']
3986 self.assertTrue(session.id)
3987 self.assertGreater(session.time, 0)
3988 self.assertGreater(session.timeout, 0)
3989 self.assertTrue(session.has_ticket)
3990 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3991 self.assertGreater(session.ticket_lifetime_hint, 0)
3992 self.assertFalse(stats['session_reused'])
3993 sess_stat = server_context.session_stats()
3994 self.assertEqual(sess_stat['accept'], 1)
3995 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003996
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003997 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003998 stats = server_params_test(client_context, server_context,
3999 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004000 sess_stat = server_context.session_stats()
4001 self.assertEqual(sess_stat['accept'], 2)
4002 self.assertEqual(sess_stat['hits'], 1)
4003 self.assertTrue(stats['session_reused'])
4004 session2 = stats['session']
4005 self.assertEqual(session2.id, session.id)
4006 self.assertEqual(session2, session)
4007 self.assertIsNot(session2, session)
4008 self.assertGreaterEqual(session2.time, session.time)
4009 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004010
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004011 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004012 stats = server_params_test(client_context, server_context,
4013 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004014 self.assertFalse(stats['session_reused'])
4015 session3 = stats['session']
4016 self.assertNotEqual(session3.id, session.id)
4017 self.assertNotEqual(session3, session)
4018 sess_stat = server_context.session_stats()
4019 self.assertEqual(sess_stat['accept'], 3)
4020 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004021
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004023 stats = server_params_test(client_context, server_context,
4024 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004025 self.assertTrue(stats['session_reused'])
4026 session4 = stats['session']
4027 self.assertEqual(session4.id, session.id)
4028 self.assertEqual(session4, session)
4029 self.assertGreaterEqual(session4.time, session.time)
4030 self.assertGreaterEqual(session4.timeout, session.timeout)
4031 sess_stat = server_context.session_stats()
4032 self.assertEqual(sess_stat['accept'], 4)
4033 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004034
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004035 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004036 client_context, server_context, hostname = testing_context()
4037 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004038
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004039 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004040 client_context.options |= ssl.OP_NO_TLSv1_3
4041 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004042
Christian Heimesa170fa12017-09-15 20:27:30 +02004043 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004044 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004045 with client_context.wrap_socket(socket.socket(),
4046 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004047 # session is None before handshake
4048 self.assertEqual(s.session, None)
4049 self.assertEqual(s.session_reused, None)
4050 s.connect((HOST, server.port))
4051 session = s.session
4052 self.assertTrue(session)
4053 with self.assertRaises(TypeError) as e:
4054 s.session = object
4055 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004056
Christian Heimesa170fa12017-09-15 20:27:30 +02004057 with client_context.wrap_socket(socket.socket(),
4058 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004059 s.connect((HOST, server.port))
4060 # cannot set session after handshake
4061 with self.assertRaises(ValueError) as e:
4062 s.session = session
4063 self.assertEqual(str(e.exception),
4064 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004065
Christian Heimesa170fa12017-09-15 20:27:30 +02004066 with client_context.wrap_socket(socket.socket(),
4067 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004068 # can set session before handshake and before the
4069 # connection was established
4070 s.session = session
4071 s.connect((HOST, server.port))
4072 self.assertEqual(s.session.id, session.id)
4073 self.assertEqual(s.session, session)
4074 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004075
Christian Heimesa170fa12017-09-15 20:27:30 +02004076 with client_context2.wrap_socket(socket.socket(),
4077 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004078 # cannot re-use session with a different SSLContext
4079 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004080 s.session = session
4081 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004082 self.assertEqual(str(e.exception),
4083 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004084
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004085
Thomas Woutersed03b412007-08-28 21:37:11 +00004086def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004087 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004088 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004089 plats = {
4090 'Linux': platform.linux_distribution,
4091 'Mac': platform.mac_ver,
4092 'Windows': platform.win32_ver,
4093 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004094 with warnings.catch_warnings():
4095 warnings.filterwarnings(
4096 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004097 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004098 'functions are deprecated .*',
4099 PendingDeprecationWarning,
4100 )
4101 for name, func in plats.items():
4102 plat = func()
4103 if plat and plat[0]:
4104 plat = '%s %r' % (name, plat)
4105 break
4106 else:
4107 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004108 print("test_ssl: testing with %r %r" %
4109 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4110 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004111 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004112 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4113 try:
4114 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4115 except AttributeError:
4116 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004117
Antoine Pitrou152efa22010-05-16 18:19:27 +00004118 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004119 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004120 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004121 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004122 BADCERT, BADKEY, EMPTYCERT]:
4123 if not os.path.exists(filename):
4124 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004125
Martin Panter3840b2a2016-03-27 01:53:46 +00004126 tests = [
4127 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004128 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004129 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004130
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004131 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004132 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004133
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004134 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004135 try:
4136 support.run_unittest(*tests)
4137 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004138 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004139
4140if __name__ == "__main__":
4141 test_main()