blob: ca2357e98e3e002a0b877950b9457bd351a565aa [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080058WRONG_CERT = data_file("wrongcert.pem")
Christian Heimesefff7062013-11-21 03:35:02 +010059
Christian Heimesbd5c7d22018-01-20 15:16:30 +010060CERTFILE_INFO = {
61 'issuer': ((('countryName', 'XY'),),
62 (('localityName', 'Castle Anthrax'),),
63 (('organizationName', 'Python Software Foundation'),),
64 (('commonName', 'localhost'),)),
65 'notAfter': 'Jan 17 19:09:06 2028 GMT',
66 'notBefore': 'Jan 19 19:09:06 2018 GMT',
67 'serialNumber': 'F9BA076D5B6ABD9B',
68 'subject': ((('countryName', 'XY'),),
69 (('localityName', 'Castle Anthrax'),),
70 (('organizationName', 'Python Software Foundation'),),
71 (('commonName', 'localhost'),)),
72 'subjectAltName': (('DNS', 'localhost'),),
73 'version': 3
74}
Antoine Pitrou152efa22010-05-16 18:19:27 +000075
Christian Heimes22587792013-11-21 23:56:13 +010076# empty CRL
77CRLFILE = data_file("revocation.crl")
78
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010079# Two keys and certs signed by the same CA (for SNI tests)
80SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020081SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082
83SIGNED_CERTFILE_INFO = {
84 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
85 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
86 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
87 'issuer': ((('countryName', 'XY'),),
88 (('organizationName', 'Python Software Foundation CA'),),
89 (('commonName', 'our-ca-server'),)),
90 'notAfter': 'Nov 28 19:09:06 2027 GMT',
91 'notBefore': 'Jan 19 19:09:06 2018 GMT',
92 'serialNumber': '82EDBF41C880919C',
93 'subject': ((('countryName', 'XY'),),
94 (('localityName', 'Castle Anthrax'),),
95 (('organizationName', 'Python Software Foundation'),),
96 (('commonName', 'localhost'),)),
97 'subjectAltName': (('DNS', 'localhost'),),
98 'version': 3
99}
100
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100101SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200102SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100103SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
104SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
105
Martin Panter3840b2a2016-03-27 01:53:46 +0000106# Same certificate as pycacert.pem, but without extra text in file
107SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200108# cert with all kinds of subject alt names
109ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100110IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100111
Martin Panter3d81d932016-01-14 09:36:00 +0000112REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000113
114EMPTYCERT = data_file("nullcert.pem")
115BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000116NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000117BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200118NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200119NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400121DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
185def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
187 @functools.wraps(func)
188 def f(*args, **kwargs):
189 try:
190 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
191 except ssl.SSLError:
192 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
193 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
194 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
195 return func(*args, **kwargs)
196 return f
197 else:
198 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100200needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
201
Antoine Pitrou23df4832010-08-04 17:14:06 +0000202
Christian Heimesd0486372016-09-10 23:23:33 +0200203def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
204 cert_reqs=ssl.CERT_NONE, ca_certs=None,
205 ciphers=None, certfile=None, keyfile=None,
206 **kwargs):
207 context = ssl.SSLContext(ssl_version)
208 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200209 if cert_reqs == ssl.CERT_NONE:
210 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200211 context.verify_mode = cert_reqs
212 if ca_certs is not None:
213 context.load_verify_locations(ca_certs)
214 if certfile is not None or keyfile is not None:
215 context.load_cert_chain(certfile, keyfile)
216 if ciphers is not None:
217 context.set_ciphers(ciphers)
218 return context.wrap_socket(sock, **kwargs)
219
Christian Heimesa170fa12017-09-15 20:27:30 +0200220
221def testing_context(server_cert=SIGNED_CERTFILE):
222 """Create context
223
224 client_context, server_context, hostname = testing_context()
225 """
226 if server_cert == SIGNED_CERTFILE:
227 hostname = SIGNED_CERTFILE_HOSTNAME
228 elif server_cert == SIGNED_CERTFILE2:
229 hostname = SIGNED_CERTFILE2_HOSTNAME
230 else:
231 raise ValueError(server_cert)
232
233 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
234 client_context.load_verify_locations(SIGNING_CA)
235
236 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
237 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800238 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200239
240 return client_context, server_context, hostname
241
242
Antoine Pitrou152efa22010-05-16 18:19:27 +0000243class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000244
Antoine Pitrou480a1242010-04-28 21:37:09 +0000245 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000246 ssl.CERT_NONE
247 ssl.CERT_OPTIONAL
248 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100249 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100250 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100251 if ssl.HAS_ECDH:
252 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100253 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
254 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000255 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100256 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700257 ssl.OP_NO_SSLv2
258 ssl.OP_NO_SSLv3
259 ssl.OP_NO_TLSv1
260 ssl.OP_NO_TLSv1_3
261 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
262 ssl.OP_NO_TLSv1_1
263 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200264 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000265
Christian Heimes89c20512018-02-27 11:17:32 +0100266 def test_private_init(self):
267 with self.assertRaisesRegex(TypeError, "public constructor"):
268 with socket.socket() as s:
269 ssl.SSLSocket(s)
270
Antoine Pitrou172f0252014-04-18 20:33:08 +0200271 def test_str_for_enums(self):
272 # Make sure that the PROTOCOL_* constants have enum-like string
273 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200274 proto = ssl.PROTOCOL_TLS
275 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200276 ctx = ssl.SSLContext(proto)
277 self.assertIs(ctx.protocol, proto)
278
Antoine Pitrou480a1242010-04-28 21:37:09 +0000279 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000282 sys.stdout.write("\n RAND_status is %d (%s)\n"
283 % (v, (v and "sufficient randomness") or
284 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200285
286 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
287 self.assertEqual(len(data), 16)
288 self.assertEqual(is_cryptographic, v == 1)
289 if v:
290 data = ssl.RAND_bytes(16)
291 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200292 else:
293 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200294
Victor Stinner1e81a392013-12-19 16:47:04 +0100295 # negative num is invalid
296 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
297 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
298
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100299 if hasattr(ssl, 'RAND_egd'):
300 self.assertRaises(TypeError, ssl.RAND_egd, 1)
301 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000302 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200303 ssl.RAND_add(b"this is a random bytes object", 75.0)
304 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000305
Christian Heimesf77b4b22013-08-21 13:26:05 +0200306 @unittest.skipUnless(os.name == 'posix', 'requires posix')
307 def test_random_fork(self):
308 status = ssl.RAND_status()
309 if not status:
310 self.fail("OpenSSL's PRNG has insufficient randomness")
311
312 rfd, wfd = os.pipe()
313 pid = os.fork()
314 if pid == 0:
315 try:
316 os.close(rfd)
317 child_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(child_random), 16)
319 os.write(wfd, child_random)
320 os.close(wfd)
321 except BaseException:
322 os._exit(1)
323 else:
324 os._exit(0)
325 else:
326 os.close(wfd)
327 self.addCleanup(os.close, rfd)
328 _, status = os.waitpid(pid, 0)
329 self.assertEqual(status, 0)
330
331 child_random = os.read(rfd, 16)
332 self.assertEqual(len(child_random), 16)
333 parent_random = ssl.RAND_pseudo_bytes(16)[0]
334 self.assertEqual(len(parent_random), 16)
335
336 self.assertNotEqual(child_random, parent_random)
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimes824f7f32013-08-17 00:54:47 +0200367 def test_parse_cert_CVE_2013_4238(self):
368 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 subject = ((('countryName', 'US'),),
372 (('stateOrProvinceName', 'Oregon'),),
373 (('localityName', 'Beaverton'),),
374 (('organizationName', 'Python Software Foundation'),),
375 (('organizationalUnitName', 'Python Core Development'),),
376 (('commonName', 'null.python.org\x00example.org'),),
377 (('emailAddress', 'python-dev@python.org'),))
378 self.assertEqual(p['subject'], subject)
379 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200380 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
381 san = (('DNS', 'altnull.python.org\x00example.com'),
382 ('email', 'null@python.org\x00user@example.org'),
383 ('URI', 'http://null.python.org\x00http://example.org'),
384 ('IP Address', '192.0.2.1'),
385 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
386 else:
387 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
388 san = (('DNS', 'altnull.python.org\x00example.com'),
389 ('email', 'null@python.org\x00user@example.org'),
390 ('URI', 'http://null.python.org\x00http://example.org'),
391 ('IP Address', '192.0.2.1'),
392 ('IP Address', '<invalid>'))
393
394 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200395
Christian Heimes1c03abd2016-09-06 23:25:35 +0200396 def test_parse_all_sans(self):
397 p = ssl._ssl._test_decode_cert(ALLSANFILE)
398 self.assertEqual(p['subjectAltName'],
399 (
400 ('DNS', 'allsans'),
401 ('othername', '<unsupported>'),
402 ('othername', '<unsupported>'),
403 ('email', 'user@example.org'),
404 ('DNS', 'www.example.org'),
405 ('DirName',
406 ((('countryName', 'XY'),),
407 (('localityName', 'Castle Anthrax'),),
408 (('organizationName', 'Python Software Foundation'),),
409 (('commonName', 'dirname example'),))),
410 ('URI', 'https://www.python.org/'),
411 ('IP Address', '127.0.0.1'),
412 ('IP Address', '0:0:0:0:0:0:0:1\n'),
413 ('Registered ID', '1.2.3.4.5')
414 )
415 )
416
Antoine Pitrou480a1242010-04-28 21:37:09 +0000417 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000418 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000419 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000420 d1 = ssl.PEM_cert_to_DER_cert(pem)
421 p2 = ssl.DER_cert_to_PEM_cert(d1)
422 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000423 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000424 if not p2.startswith(ssl.PEM_HEADER + '\n'):
425 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
426 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
427 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000428
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000429 def test_openssl_version(self):
430 n = ssl.OPENSSL_VERSION_NUMBER
431 t = ssl.OPENSSL_VERSION_INFO
432 s = ssl.OPENSSL_VERSION
433 self.assertIsInstance(n, int)
434 self.assertIsInstance(t, tuple)
435 self.assertIsInstance(s, str)
436 # Some sanity checks follow
437 # >= 0.9
438 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400439 # < 3.0
440 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000441 major, minor, fix, patch, status = t
442 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400443 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000444 self.assertGreaterEqual(minor, 0)
445 self.assertLess(minor, 256)
446 self.assertGreaterEqual(fix, 0)
447 self.assertLess(fix, 256)
448 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100449 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 self.assertGreaterEqual(status, 0)
451 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400452 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200453 if IS_LIBRESSL:
454 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100455 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400456 else:
457 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100458 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000459
Antoine Pitrou9d543662010-04-23 23:10:32 +0000460 @support.cpython_only
461 def test_refcycle(self):
462 # Issue #7943: an SSL object doesn't create reference cycles with
463 # itself.
464 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200465 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000466 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100467 with support.check_warnings(("", ResourceWarning)):
468 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100469 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000470
Antoine Pitroua468adc2010-09-14 14:43:44 +0000471 def test_wrapped_unconnected(self):
472 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200473 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000474 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200475 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100476 self.assertRaises(OSError, ss.recv, 1)
477 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
478 self.assertRaises(OSError, ss.recvfrom, 1)
479 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
480 self.assertRaises(OSError, ss.send, b'x')
481 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800482 self.assertRaises(NotImplementedError, ss.sendmsg,
483 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000484
Antoine Pitrou40f08742010-04-24 22:04:40 +0000485 def test_timeout(self):
486 # Issue #8524: when creating an SSL socket, the timeout of the
487 # original socket should be retained.
488 for timeout in (None, 0.0, 5.0):
489 s = socket.socket(socket.AF_INET)
490 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200491 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100492 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000493
Christian Heimesd0486372016-09-10 23:23:33 +0200494 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000495 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000496 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000497 "certfile must be specified",
498 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000499 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000500 "certfile must be specified for server-side operations",
501 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000502 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000503 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200504 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100505 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
506 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200507 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200508 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000509 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000510 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000511 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200512 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000513 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000514 ssl.wrap_socket(sock,
515 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000516 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200517 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000518 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000519 ssl.wrap_socket(sock,
520 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000521 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000522
Martin Panter3464ea22016-02-01 21:58:11 +0000523 def bad_cert_test(self, certfile):
524 """Check that trying to use the given client certificate fails"""
525 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
526 certfile)
527 sock = socket.socket()
528 self.addCleanup(sock.close)
529 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200530 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200531 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000532
533 def test_empty_cert(self):
534 """Wrapping with an empty cert file"""
535 self.bad_cert_test("nullcert.pem")
536
537 def test_malformed_cert(self):
538 """Wrapping with a badly formatted certificate (syntax error)"""
539 self.bad_cert_test("badcert.pem")
540
541 def test_malformed_key(self):
542 """Wrapping with a badly formatted key (syntax error)"""
543 self.bad_cert_test("badkey.pem")
544
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 def test_match_hostname(self):
546 def ok(cert, hostname):
547 ssl.match_hostname(cert, hostname)
548 def fail(cert, hostname):
549 self.assertRaises(ssl.CertificateError,
550 ssl.match_hostname, cert, hostname)
551
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100552 # -- Hostname matching --
553
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000554 cert = {'subject': ((('commonName', 'example.com'),),)}
555 ok(cert, 'example.com')
556 ok(cert, 'ExAmple.cOm')
557 fail(cert, 'www.example.com')
558 fail(cert, '.example.com')
559 fail(cert, 'example.org')
560 fail(cert, 'exampleXcom')
561
562 cert = {'subject': ((('commonName', '*.a.com'),),)}
563 ok(cert, 'foo.a.com')
564 fail(cert, 'bar.foo.a.com')
565 fail(cert, 'a.com')
566 fail(cert, 'Xa.com')
567 fail(cert, '.a.com')
568
Mandeep Singhede2ac92017-11-27 04:01:27 +0530569 # only match wildcards when they are the only thing
570 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000571 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530572 fail(cert, 'foo.com')
573 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000574 fail(cert, 'bar.com')
575 fail(cert, 'foo.a.com')
576 fail(cert, 'bar.foo.com')
577
Christian Heimes824f7f32013-08-17 00:54:47 +0200578 # NULL bytes are bad, CVE-2013-4073
579 cert = {'subject': ((('commonName',
580 'null.python.org\x00example.org'),),)}
581 ok(cert, 'null.python.org\x00example.org') # or raise an error?
582 fail(cert, 'example.org')
583 fail(cert, 'null.python.org')
584
Georg Brandl72c98d32013-10-27 07:16:53 +0100585 # error cases with wildcards
586 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
587 fail(cert, 'bar.foo.a.com')
588 fail(cert, 'a.com')
589 fail(cert, 'Xa.com')
590 fail(cert, '.a.com')
591
592 cert = {'subject': ((('commonName', 'a.*.com'),),)}
593 fail(cert, 'a.foo.com')
594 fail(cert, 'a..com')
595 fail(cert, 'a.com')
596
597 # wildcard doesn't match IDNA prefix 'xn--'
598 idna = 'püthon.python.org'.encode("idna").decode("ascii")
599 cert = {'subject': ((('commonName', idna),),)}
600 ok(cert, idna)
601 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
602 fail(cert, idna)
603 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
604 fail(cert, idna)
605
606 # wildcard in first fragment and IDNA A-labels in sequent fragments
607 # are supported.
608 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
609 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530610 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
611 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100612 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
613 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
614
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000615 # Slightly fake real-world example
616 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
617 'subject': ((('commonName', 'linuxfrz.org'),),),
618 'subjectAltName': (('DNS', 'linuxfr.org'),
619 ('DNS', 'linuxfr.com'),
620 ('othername', '<unsupported>'))}
621 ok(cert, 'linuxfr.org')
622 ok(cert, 'linuxfr.com')
623 # Not a "DNS" entry
624 fail(cert, '<unsupported>')
625 # When there is a subjectAltName, commonName isn't used
626 fail(cert, 'linuxfrz.org')
627
628 # A pristine real-world example
629 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
630 'subject': ((('countryName', 'US'),),
631 (('stateOrProvinceName', 'California'),),
632 (('localityName', 'Mountain View'),),
633 (('organizationName', 'Google Inc'),),
634 (('commonName', 'mail.google.com'),))}
635 ok(cert, 'mail.google.com')
636 fail(cert, 'gmail.com')
637 # Only commonName is considered
638 fail(cert, 'California')
639
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100640 # -- IPv4 matching --
641 cert = {'subject': ((('commonName', 'example.com'),),),
642 'subjectAltName': (('DNS', 'example.com'),
643 ('IP Address', '10.11.12.13'),
644 ('IP Address', '14.15.16.17'))}
645 ok(cert, '10.11.12.13')
646 ok(cert, '14.15.16.17')
647 fail(cert, '14.15.16.18')
648 fail(cert, 'example.net')
649
650 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800651 if hasattr(socket, 'AF_INET6'):
652 cert = {'subject': ((('commonName', 'example.com'),),),
653 'subjectAltName': (
654 ('DNS', 'example.com'),
655 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
656 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
657 ok(cert, '2001::cafe')
658 ok(cert, '2003::baba')
659 fail(cert, '2003::bebe')
660 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100661
662 # -- Miscellaneous --
663
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000664 # Neither commonName nor subjectAltName
665 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
666 'subject': ((('countryName', 'US'),),
667 (('stateOrProvinceName', 'California'),),
668 (('localityName', 'Mountain View'),),
669 (('organizationName', 'Google Inc'),))}
670 fail(cert, 'mail.google.com')
671
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200672 # No DNS entry in subjectAltName but a commonName
673 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
674 'subject': ((('countryName', 'US'),),
675 (('stateOrProvinceName', 'California'),),
676 (('localityName', 'Mountain View'),),
677 (('commonName', 'mail.google.com'),)),
678 'subjectAltName': (('othername', 'blabla'), )}
679 ok(cert, 'mail.google.com')
680
681 # No DNS entry subjectAltName and no commonName
682 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
683 'subject': ((('countryName', 'US'),),
684 (('stateOrProvinceName', 'California'),),
685 (('localityName', 'Mountain View'),),
686 (('organizationName', 'Google Inc'),)),
687 'subjectAltName': (('othername', 'blabla'),)}
688 fail(cert, 'google.com')
689
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000690 # Empty cert / no cert
691 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
692 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
693
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200694 # Issue #17980: avoid denials of service by refusing more than one
695 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800696 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
697 with self.assertRaisesRegex(
698 ssl.CertificateError,
699 "partial wildcards in leftmost label are not supported"):
700 ssl.match_hostname(cert, 'axxb.example.com')
701
702 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
703 with self.assertRaisesRegex(
704 ssl.CertificateError,
705 "wildcard can only be present in the leftmost label"):
706 ssl.match_hostname(cert, 'www.sub.example.com')
707
708 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
709 with self.assertRaisesRegex(
710 ssl.CertificateError,
711 "too many wildcards"):
712 ssl.match_hostname(cert, 'axxbxxc.example.com')
713
714 cert = {'subject': ((('commonName', '*'),),)}
715 with self.assertRaisesRegex(
716 ssl.CertificateError,
717 "sole wildcard without additional labels are not support"):
718 ssl.match_hostname(cert, 'host')
719
720 cert = {'subject': ((('commonName', '*.com'),),)}
721 with self.assertRaisesRegex(
722 ssl.CertificateError,
723 r"hostname 'com' doesn't match '\*.com'"):
724 ssl.match_hostname(cert, 'com')
725
726 # extra checks for _inet_paton()
727 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
728 with self.assertRaises(ValueError):
729 ssl._inet_paton(invalid)
730 for ipaddr in ['127.0.0.1', '192.168.0.1']:
731 self.assertTrue(ssl._inet_paton(ipaddr))
732 if hasattr(socket, 'AF_INET6'):
733 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
734 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200735
Antoine Pitroud5323212010-10-22 18:19:07 +0000736 def test_server_side(self):
737 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200738 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000739 with socket.socket() as sock:
740 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
741 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000742
Antoine Pitroud6494802011-07-21 01:11:30 +0200743 def test_unknown_channel_binding(self):
744 # should raise ValueError for unknown type
745 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200746 s.bind(('127.0.0.1', 0))
747 s.listen()
748 c = socket.socket(socket.AF_INET)
749 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200750 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100751 with self.assertRaises(ValueError):
752 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200753 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200754
755 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
756 "'tls-unique' channel binding not available")
757 def test_tls_unique_channel_binding(self):
758 # unconnected should return None for known type
759 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200760 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100761 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200762 # the same for server-side
763 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200764 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100765 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200766
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600767 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200768 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600769 r = repr(ss)
770 with self.assertWarns(ResourceWarning) as cm:
771 ss = None
772 support.gc_collect()
773 self.assertIn(r, str(cm.warning.args[0]))
774
Christian Heimes6d7ad132013-06-09 18:02:55 +0200775 def test_get_default_verify_paths(self):
776 paths = ssl.get_default_verify_paths()
777 self.assertEqual(len(paths), 6)
778 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
779
780 with support.EnvironmentVarGuard() as env:
781 env["SSL_CERT_DIR"] = CAPATH
782 env["SSL_CERT_FILE"] = CERTFILE
783 paths = ssl.get_default_verify_paths()
784 self.assertEqual(paths.cafile, CERTFILE)
785 self.assertEqual(paths.capath, CAPATH)
786
Christian Heimes44109d72013-11-22 01:51:30 +0100787 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
788 def test_enum_certificates(self):
789 self.assertTrue(ssl.enum_certificates("CA"))
790 self.assertTrue(ssl.enum_certificates("ROOT"))
791
792 self.assertRaises(TypeError, ssl.enum_certificates)
793 self.assertRaises(WindowsError, ssl.enum_certificates, "")
794
Christian Heimesc2d65e12013-11-22 16:13:55 +0100795 trust_oids = set()
796 for storename in ("CA", "ROOT"):
797 store = ssl.enum_certificates(storename)
798 self.assertIsInstance(store, list)
799 for element in store:
800 self.assertIsInstance(element, tuple)
801 self.assertEqual(len(element), 3)
802 cert, enc, trust = element
803 self.assertIsInstance(cert, bytes)
804 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
805 self.assertIsInstance(trust, (set, bool))
806 if isinstance(trust, set):
807 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100808
809 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100810 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200811
Christian Heimes46bebee2013-06-09 19:03:31 +0200812 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100813 def test_enum_crls(self):
814 self.assertTrue(ssl.enum_crls("CA"))
815 self.assertRaises(TypeError, ssl.enum_crls)
816 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200817
Christian Heimes44109d72013-11-22 01:51:30 +0100818 crls = ssl.enum_crls("CA")
819 self.assertIsInstance(crls, list)
820 for element in crls:
821 self.assertIsInstance(element, tuple)
822 self.assertEqual(len(element), 2)
823 self.assertIsInstance(element[0], bytes)
824 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200825
Christian Heimes46bebee2013-06-09 19:03:31 +0200826
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100827 def test_asn1object(self):
828 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
829 '1.3.6.1.5.5.7.3.1')
830
831 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
832 self.assertEqual(val, expected)
833 self.assertEqual(val.nid, 129)
834 self.assertEqual(val.shortname, 'serverAuth')
835 self.assertEqual(val.longname, 'TLS Web Server Authentication')
836 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
837 self.assertIsInstance(val, ssl._ASN1Object)
838 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
839
840 val = ssl._ASN1Object.fromnid(129)
841 self.assertEqual(val, expected)
842 self.assertIsInstance(val, ssl._ASN1Object)
843 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100844 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
845 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100846 for i in range(1000):
847 try:
848 obj = ssl._ASN1Object.fromnid(i)
849 except ValueError:
850 pass
851 else:
852 self.assertIsInstance(obj.nid, int)
853 self.assertIsInstance(obj.shortname, str)
854 self.assertIsInstance(obj.longname, str)
855 self.assertIsInstance(obj.oid, (str, type(None)))
856
857 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
858 self.assertEqual(val, expected)
859 self.assertIsInstance(val, ssl._ASN1Object)
860 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
861 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
862 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100863 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
864 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100865
Christian Heimes72d28502013-11-23 13:56:58 +0100866 def test_purpose_enum(self):
867 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
868 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
869 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
872 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
873 '1.3.6.1.5.5.7.3.1')
874
875 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
876 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
877 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
880 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
881 '1.3.6.1.5.5.7.3.2')
882
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100883 def test_unsupported_dtls(self):
884 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
885 self.addCleanup(s.close)
886 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200887 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100888 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100890 with self.assertRaises(NotImplementedError) as cx:
891 ctx.wrap_socket(s)
892 self.assertEqual(str(cx.exception), "only stream sockets are supported")
893
Antoine Pitrouc695c952014-04-28 20:57:36 +0200894 def cert_time_ok(self, timestring, timestamp):
895 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
896
897 def cert_time_fail(self, timestring):
898 with self.assertRaises(ValueError):
899 ssl.cert_time_to_seconds(timestring)
900
901 @unittest.skipUnless(utc_offset(),
902 'local time needs to be different from UTC')
903 def test_cert_time_to_seconds_timezone(self):
904 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
905 # results if local timezone is not UTC
906 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
907 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
908
909 def test_cert_time_to_seconds(self):
910 timestring = "Jan 5 09:34:43 2018 GMT"
911 ts = 1515144883.0
912 self.cert_time_ok(timestring, ts)
913 # accept keyword parameter, assert its name
914 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
915 # accept both %e and %d (space or zero generated by strftime)
916 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
917 # case-insensitive
918 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
919 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
920 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
921 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
922 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
923 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
924 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
925 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
926
927 newyear_ts = 1230768000.0
928 # leap seconds
929 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
930 # same timestamp
931 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
932
933 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
934 # allow 60th second (even if it is not a leap second)
935 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
936 # allow 2nd leap second for compatibility with time.strptime()
937 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
938 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
939
Mike53f7a7c2017-12-14 14:04:53 +0300940 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200941 # 99991231235959Z (rfc 5280)
942 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
943
944 @support.run_with_locale('LC_ALL', '')
945 def test_cert_time_to_seconds_locale(self):
946 # `cert_time_to_seconds()` should be locale independent
947
948 def local_february_name():
949 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
950
951 if local_february_name().lower() == 'feb':
952 self.skipTest("locale-specific month name needs to be "
953 "different from C locale")
954
955 # locale-independent
956 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
957 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
958
Martin Panter3840b2a2016-03-27 01:53:46 +0000959 def test_connect_ex_error(self):
960 server = socket.socket(socket.AF_INET)
961 self.addCleanup(server.close)
962 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200963 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000964 cert_reqs=ssl.CERT_REQUIRED)
965 self.addCleanup(s.close)
966 rc = s.connect_ex((HOST, port))
967 # Issue #19919: Windows machines or VMs hosted on Windows
968 # machines sometimes return EWOULDBLOCK.
969 errors = (
970 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
971 errno.EWOULDBLOCK,
972 )
973 self.assertIn(rc, errors)
974
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100975
Antoine Pitrou152efa22010-05-16 18:19:27 +0000976class ContextTests(unittest.TestCase):
977
Antoine Pitrou23df4832010-08-04 17:14:06 +0000978 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000979 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100980 for protocol in PROTOCOLS:
981 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200982 ctx = ssl.SSLContext()
983 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000984 self.assertRaises(ValueError, ssl.SSLContext, -1)
985 self.assertRaises(ValueError, ssl.SSLContext, 42)
986
Antoine Pitrou23df4832010-08-04 17:14:06 +0000987 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000988 def test_protocol(self):
989 for proto in PROTOCOLS:
990 ctx = ssl.SSLContext(proto)
991 self.assertEqual(ctx.protocol, proto)
992
993 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000995 ctx.set_ciphers("ALL")
996 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000997 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000998 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000999
Christian Heimes892d66e2018-01-29 14:10:18 +01001000 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1001 "Test applies only to Python default ciphers")
1002 def test_python_ciphers(self):
1003 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1004 ciphers = ctx.get_ciphers()
1005 for suite in ciphers:
1006 name = suite['name']
1007 self.assertNotIn("PSK", name)
1008 self.assertNotIn("SRP", name)
1009 self.assertNotIn("MD5", name)
1010 self.assertNotIn("RC4", name)
1011 self.assertNotIn("3DES", name)
1012
Christian Heimes25bfcd52016-09-06 00:04:45 +02001013 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1014 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001016 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001017 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001018 self.assertIn('AES256-GCM-SHA384', names)
1019 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001020
Antoine Pitrou23df4832010-08-04 17:14:06 +00001021 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001022 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001024 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001025 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001026 # SSLContext also enables these by default
1027 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001028 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1029 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001030 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001031 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001032 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001033 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001034 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1035 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001036 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001037 # Ubuntu has OP_NO_SSLv3 forced on by default
1038 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001039 else:
1040 with self.assertRaises(ValueError):
1041 ctx.options = 0
1042
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 def test_verify_mode_protocol(self):
1044 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001045 # Default value
1046 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1047 ctx.verify_mode = ssl.CERT_OPTIONAL
1048 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1049 ctx.verify_mode = ssl.CERT_REQUIRED
1050 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1051 ctx.verify_mode = ssl.CERT_NONE
1052 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1053 with self.assertRaises(TypeError):
1054 ctx.verify_mode = None
1055 with self.assertRaises(ValueError):
1056 ctx.verify_mode = 42
1057
Christian Heimesa170fa12017-09-15 20:27:30 +02001058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1059 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1060 self.assertFalse(ctx.check_hostname)
1061
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1063 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1064 self.assertTrue(ctx.check_hostname)
1065
Christian Heimes61d478c2018-01-27 15:51:38 +01001066 def test_hostname_checks_common_name(self):
1067 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1068 self.assertTrue(ctx.hostname_checks_common_name)
1069 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1070 ctx.hostname_checks_common_name = True
1071 self.assertTrue(ctx.hostname_checks_common_name)
1072 ctx.hostname_checks_common_name = False
1073 self.assertFalse(ctx.hostname_checks_common_name)
1074 ctx.hostname_checks_common_name = True
1075 self.assertTrue(ctx.hostname_checks_common_name)
1076 else:
1077 with self.assertRaises(AttributeError):
1078 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001079
Christian Heimes2427b502013-11-23 11:24:32 +01001080 @unittest.skipUnless(have_verify_flags(),
1081 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001082 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001083 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001084 # default value
1085 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1086 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001087 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1088 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1089 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1090 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1091 ctx.verify_flags = ssl.VERIFY_DEFAULT
1092 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1093 # supports any value
1094 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1095 self.assertEqual(ctx.verify_flags,
1096 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1097 with self.assertRaises(TypeError):
1098 ctx.verify_flags = None
1099
Antoine Pitrou152efa22010-05-16 18:19:27 +00001100 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001101 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001102 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001103 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001104 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1105 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001106 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001107 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001108 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001109 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001110 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001111 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001112 ctx.load_cert_chain(EMPTYCERT)
1113 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001115 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1116 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1117 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001118 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001119 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001120 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001121 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001122 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001123 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1124 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001125 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001126 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001127 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001128 # Password protected key and cert
1129 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1130 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1131 ctx.load_cert_chain(CERTFILE_PROTECTED,
1132 password=bytearray(KEY_PASSWORD.encode()))
1133 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1134 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1135 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1136 bytearray(KEY_PASSWORD.encode()))
1137 with self.assertRaisesRegex(TypeError, "should be a string"):
1138 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1139 with self.assertRaises(ssl.SSLError):
1140 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1141 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1142 # openssl has a fixed limit on the password buffer.
1143 # PEM_BUFSIZE is generally set to 1kb.
1144 # Return a string larger than this.
1145 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1146 # Password callback
1147 def getpass_unicode():
1148 return KEY_PASSWORD
1149 def getpass_bytes():
1150 return KEY_PASSWORD.encode()
1151 def getpass_bytearray():
1152 return bytearray(KEY_PASSWORD.encode())
1153 def getpass_badpass():
1154 return "badpass"
1155 def getpass_huge():
1156 return b'a' * (1024 * 1024)
1157 def getpass_bad_type():
1158 return 9
1159 def getpass_exception():
1160 raise Exception('getpass error')
1161 class GetPassCallable:
1162 def __call__(self):
1163 return KEY_PASSWORD
1164 def getpass(self):
1165 return KEY_PASSWORD
1166 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1167 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1168 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1169 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1170 ctx.load_cert_chain(CERTFILE_PROTECTED,
1171 password=GetPassCallable().getpass)
1172 with self.assertRaises(ssl.SSLError):
1173 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1174 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1175 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1176 with self.assertRaisesRegex(TypeError, "must return a string"):
1177 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1178 with self.assertRaisesRegex(Exception, "getpass error"):
1179 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1180 # Make sure the password function isn't called if it isn't needed
1181 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001182
1183 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001184 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001185 ctx.load_verify_locations(CERTFILE)
1186 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1187 ctx.load_verify_locations(BYTES_CERTFILE)
1188 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1189 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001190 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001191 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001192 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001193 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001194 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001195 ctx.load_verify_locations(BADCERT)
1196 ctx.load_verify_locations(CERTFILE, CAPATH)
1197 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1198
Victor Stinner80f75e62011-01-29 11:31:20 +00001199 # Issue #10989: crash if the second argument type is invalid
1200 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1201
Christian Heimesefff7062013-11-21 03:35:02 +01001202 def test_load_verify_cadata(self):
1203 # test cadata
1204 with open(CAFILE_CACERT) as f:
1205 cacert_pem = f.read()
1206 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1207 with open(CAFILE_NEURONIO) as f:
1208 neuronio_pem = f.read()
1209 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1210
1211 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001212 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001213 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1214 ctx.load_verify_locations(cadata=cacert_pem)
1215 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1216 ctx.load_verify_locations(cadata=neuronio_pem)
1217 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1218 # cert already in hash table
1219 ctx.load_verify_locations(cadata=neuronio_pem)
1220 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1221
1222 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001223 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001224 combined = "\n".join((cacert_pem, neuronio_pem))
1225 ctx.load_verify_locations(cadata=combined)
1226 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1227
1228 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001229 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001230 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1231 neuronio_pem, "tail"]
1232 ctx.load_verify_locations(cadata="\n".join(combined))
1233 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1234
1235 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001236 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001237 ctx.load_verify_locations(cadata=cacert_der)
1238 ctx.load_verify_locations(cadata=neuronio_der)
1239 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1240 # cert already in hash table
1241 ctx.load_verify_locations(cadata=cacert_der)
1242 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1243
1244 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001245 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001246 combined = b"".join((cacert_der, neuronio_der))
1247 ctx.load_verify_locations(cadata=combined)
1248 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1249
1250 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001251 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001252 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1253
1254 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1255 ctx.load_verify_locations(cadata="broken")
1256 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1257 ctx.load_verify_locations(cadata=b"broken")
1258
1259
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001260 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001261 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001262 ctx.load_dh_params(DHFILE)
1263 if os.name != 'nt':
1264 ctx.load_dh_params(BYTES_DHFILE)
1265 self.assertRaises(TypeError, ctx.load_dh_params)
1266 self.assertRaises(TypeError, ctx.load_dh_params, None)
1267 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001268 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001269 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001270 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001271 ctx.load_dh_params(CERTFILE)
1272
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001273 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001274 def test_session_stats(self):
1275 for proto in PROTOCOLS:
1276 ctx = ssl.SSLContext(proto)
1277 self.assertEqual(ctx.session_stats(), {
1278 'number': 0,
1279 'connect': 0,
1280 'connect_good': 0,
1281 'connect_renegotiate': 0,
1282 'accept': 0,
1283 'accept_good': 0,
1284 'accept_renegotiate': 0,
1285 'hits': 0,
1286 'misses': 0,
1287 'timeouts': 0,
1288 'cache_full': 0,
1289 })
1290
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001291 def test_set_default_verify_paths(self):
1292 # There's not much we can do to test that it acts as expected,
1293 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001294 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001295 ctx.set_default_verify_paths()
1296
Antoine Pitrou501da612011-12-21 09:27:41 +01001297 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001298 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001299 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001300 ctx.set_ecdh_curve("prime256v1")
1301 ctx.set_ecdh_curve(b"prime256v1")
1302 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1303 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1304 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1305 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1306
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001307 @needs_sni
1308 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001310
1311 # set_servername_callback expects a callable, or None
1312 self.assertRaises(TypeError, ctx.set_servername_callback)
1313 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1314 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1315 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1316
1317 def dummycallback(sock, servername, ctx):
1318 pass
1319 ctx.set_servername_callback(None)
1320 ctx.set_servername_callback(dummycallback)
1321
1322 @needs_sni
1323 def test_sni_callback_refcycle(self):
1324 # Reference cycles through the servername callback are detected
1325 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001326 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001327 def dummycallback(sock, servername, ctx, cycle=ctx):
1328 pass
1329 ctx.set_servername_callback(dummycallback)
1330 wr = weakref.ref(ctx)
1331 del ctx, dummycallback
1332 gc.collect()
1333 self.assertIs(wr(), None)
1334
Christian Heimes9a5395a2013-06-17 15:44:12 +02001335 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001336 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001337 self.assertEqual(ctx.cert_store_stats(),
1338 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1339 ctx.load_cert_chain(CERTFILE)
1340 self.assertEqual(ctx.cert_store_stats(),
1341 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1342 ctx.load_verify_locations(CERTFILE)
1343 self.assertEqual(ctx.cert_store_stats(),
1344 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001345 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001346 self.assertEqual(ctx.cert_store_stats(),
1347 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1348
1349 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001350 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001351 self.assertEqual(ctx.get_ca_certs(), [])
1352 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1353 ctx.load_verify_locations(CERTFILE)
1354 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001355 # but CAFILE_CACERT is a CA cert
1356 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001357 self.assertEqual(ctx.get_ca_certs(),
1358 [{'issuer': ((('organizationName', 'Root CA'),),
1359 (('organizationalUnitName', 'http://www.cacert.org'),),
1360 (('commonName', 'CA Cert Signing Authority'),),
1361 (('emailAddress', 'support@cacert.org'),)),
1362 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1363 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1364 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001365 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001366 'subject': ((('organizationName', 'Root CA'),),
1367 (('organizationalUnitName', 'http://www.cacert.org'),),
1368 (('commonName', 'CA Cert Signing Authority'),),
1369 (('emailAddress', 'support@cacert.org'),)),
1370 'version': 3}])
1371
Martin Panterb55f8b72016-01-14 12:53:56 +00001372 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001373 pem = f.read()
1374 der = ssl.PEM_cert_to_DER_cert(pem)
1375 self.assertEqual(ctx.get_ca_certs(True), [der])
1376
Christian Heimes72d28502013-11-23 13:56:58 +01001377 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001378 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001379 ctx.load_default_certs()
1380
Christian Heimesa170fa12017-09-15 20:27:30 +02001381 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001382 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1383 ctx.load_default_certs()
1384
Christian Heimesa170fa12017-09-15 20:27:30 +02001385 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001386 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1387
Christian Heimesa170fa12017-09-15 20:27:30 +02001388 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001389 self.assertRaises(TypeError, ctx.load_default_certs, None)
1390 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1391
Benjamin Peterson91244e02014-10-03 18:17:15 -04001392 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001393 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001394 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001395 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001396 with support.EnvironmentVarGuard() as env:
1397 env["SSL_CERT_DIR"] = CAPATH
1398 env["SSL_CERT_FILE"] = CERTFILE
1399 ctx.load_default_certs()
1400 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1401
Benjamin Peterson91244e02014-10-03 18:17:15 -04001402 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001403 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001404 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001405 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001406 ctx.load_default_certs()
1407 stats = ctx.cert_store_stats()
1408
Christian Heimesa170fa12017-09-15 20:27:30 +02001409 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001410 with support.EnvironmentVarGuard() as env:
1411 env["SSL_CERT_DIR"] = CAPATH
1412 env["SSL_CERT_FILE"] = CERTFILE
1413 ctx.load_default_certs()
1414 stats["x509"] += 1
1415 self.assertEqual(ctx.cert_store_stats(), stats)
1416
Christian Heimes358cfd42016-09-10 22:43:48 +02001417 def _assert_context_options(self, ctx):
1418 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1419 if OP_NO_COMPRESSION != 0:
1420 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1421 OP_NO_COMPRESSION)
1422 if OP_SINGLE_DH_USE != 0:
1423 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1424 OP_SINGLE_DH_USE)
1425 if OP_SINGLE_ECDH_USE != 0:
1426 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1427 OP_SINGLE_ECDH_USE)
1428 if OP_CIPHER_SERVER_PREFERENCE != 0:
1429 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1430 OP_CIPHER_SERVER_PREFERENCE)
1431
Christian Heimes4c05b472013-11-23 15:58:30 +01001432 def test_create_default_context(self):
1433 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001434
Christian Heimesa170fa12017-09-15 20:27:30 +02001435 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001436 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001437 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001438 self._assert_context_options(ctx)
1439
Christian Heimes4c05b472013-11-23 15:58:30 +01001440 with open(SIGNING_CA) as f:
1441 cadata = f.read()
1442 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1443 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001444 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001445 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001446 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001447
1448 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001450 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001451 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001452
Christian Heimes67986f92013-11-23 22:43:47 +01001453 def test__create_stdlib_context(self):
1454 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001455 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001456 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001457 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001458 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001459
1460 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1461 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1462 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001463 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001464
1465 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001466 cert_reqs=ssl.CERT_REQUIRED,
1467 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001468 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1469 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001470 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001471 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001472
1473 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001475 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001476 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001477
Christian Heimes1aa9a752013-12-02 02:41:19 +01001478 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001479 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001480 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001481 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001482
Christian Heimese82c0342017-09-15 20:29:57 +02001483 # Auto set CERT_REQUIRED
1484 ctx.check_hostname = True
1485 self.assertTrue(ctx.check_hostname)
1486 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1487 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001488 ctx.verify_mode = ssl.CERT_REQUIRED
1489 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001490 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001491
Christian Heimese82c0342017-09-15 20:29:57 +02001492 # Changing verify_mode does not affect check_hostname
1493 ctx.check_hostname = False
1494 ctx.verify_mode = ssl.CERT_NONE
1495 ctx.check_hostname = False
1496 self.assertFalse(ctx.check_hostname)
1497 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1498 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001499 ctx.check_hostname = True
1500 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001501 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1502
1503 ctx.check_hostname = False
1504 ctx.verify_mode = ssl.CERT_OPTIONAL
1505 ctx.check_hostname = False
1506 self.assertFalse(ctx.check_hostname)
1507 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1508 # keep CERT_OPTIONAL
1509 ctx.check_hostname = True
1510 self.assertTrue(ctx.check_hostname)
1511 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001512
1513 # Cannot set CERT_NONE with check_hostname enabled
1514 with self.assertRaises(ValueError):
1515 ctx.verify_mode = ssl.CERT_NONE
1516 ctx.check_hostname = False
1517 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001518 ctx.verify_mode = ssl.CERT_NONE
1519 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001520
Christian Heimes5fe668c2016-09-12 00:01:11 +02001521 def test_context_client_server(self):
1522 # PROTOCOL_TLS_CLIENT has sane defaults
1523 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1524 self.assertTrue(ctx.check_hostname)
1525 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1526
1527 # PROTOCOL_TLS_SERVER has different but also sane defaults
1528 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1529 self.assertFalse(ctx.check_hostname)
1530 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1531
Christian Heimes4df60f12017-09-15 20:26:05 +02001532 def test_context_custom_class(self):
1533 class MySSLSocket(ssl.SSLSocket):
1534 pass
1535
1536 class MySSLObject(ssl.SSLObject):
1537 pass
1538
1539 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1540 ctx.sslsocket_class = MySSLSocket
1541 ctx.sslobject_class = MySSLObject
1542
1543 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1544 self.assertIsInstance(sock, MySSLSocket)
1545 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1546 self.assertIsInstance(obj, MySSLObject)
1547
Antoine Pitrou152efa22010-05-16 18:19:27 +00001548
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001549class SSLErrorTests(unittest.TestCase):
1550
1551 def test_str(self):
1552 # The str() of a SSLError doesn't include the errno
1553 e = ssl.SSLError(1, "foo")
1554 self.assertEqual(str(e), "foo")
1555 self.assertEqual(e.errno, 1)
1556 # Same for a subclass
1557 e = ssl.SSLZeroReturnError(1, "foo")
1558 self.assertEqual(str(e), "foo")
1559 self.assertEqual(e.errno, 1)
1560
1561 def test_lib_reason(self):
1562 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001563 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001564 with self.assertRaises(ssl.SSLError) as cm:
1565 ctx.load_dh_params(CERTFILE)
1566 self.assertEqual(cm.exception.library, 'PEM')
1567 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1568 s = str(cm.exception)
1569 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1570
1571 def test_subclass(self):
1572 # Check that the appropriate SSLError subclass is raised
1573 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001574 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1575 ctx.check_hostname = False
1576 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001577 with socket.socket() as s:
1578 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001579 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001580 c = socket.socket()
1581 c.connect(s.getsockname())
1582 c.setblocking(False)
1583 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001584 with self.assertRaises(ssl.SSLWantReadError) as cm:
1585 c.do_handshake()
1586 s = str(cm.exception)
1587 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1588 # For compatibility
1589 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1590
1591
Christian Heimes61d478c2018-01-27 15:51:38 +01001592 def test_bad_server_hostname(self):
1593 ctx = ssl.create_default_context()
1594 with self.assertRaises(ValueError):
1595 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1596 server_hostname="")
1597 with self.assertRaises(ValueError):
1598 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1599 server_hostname=".example.org")
1600
1601
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001602class MemoryBIOTests(unittest.TestCase):
1603
1604 def test_read_write(self):
1605 bio = ssl.MemoryBIO()
1606 bio.write(b'foo')
1607 self.assertEqual(bio.read(), b'foo')
1608 self.assertEqual(bio.read(), b'')
1609 bio.write(b'foo')
1610 bio.write(b'bar')
1611 self.assertEqual(bio.read(), b'foobar')
1612 self.assertEqual(bio.read(), b'')
1613 bio.write(b'baz')
1614 self.assertEqual(bio.read(2), b'ba')
1615 self.assertEqual(bio.read(1), b'z')
1616 self.assertEqual(bio.read(1), b'')
1617
1618 def test_eof(self):
1619 bio = ssl.MemoryBIO()
1620 self.assertFalse(bio.eof)
1621 self.assertEqual(bio.read(), b'')
1622 self.assertFalse(bio.eof)
1623 bio.write(b'foo')
1624 self.assertFalse(bio.eof)
1625 bio.write_eof()
1626 self.assertFalse(bio.eof)
1627 self.assertEqual(bio.read(2), b'fo')
1628 self.assertFalse(bio.eof)
1629 self.assertEqual(bio.read(1), b'o')
1630 self.assertTrue(bio.eof)
1631 self.assertEqual(bio.read(), b'')
1632 self.assertTrue(bio.eof)
1633
1634 def test_pending(self):
1635 bio = ssl.MemoryBIO()
1636 self.assertEqual(bio.pending, 0)
1637 bio.write(b'foo')
1638 self.assertEqual(bio.pending, 3)
1639 for i in range(3):
1640 bio.read(1)
1641 self.assertEqual(bio.pending, 3-i-1)
1642 for i in range(3):
1643 bio.write(b'x')
1644 self.assertEqual(bio.pending, i+1)
1645 bio.read()
1646 self.assertEqual(bio.pending, 0)
1647
1648 def test_buffer_types(self):
1649 bio = ssl.MemoryBIO()
1650 bio.write(b'foo')
1651 self.assertEqual(bio.read(), b'foo')
1652 bio.write(bytearray(b'bar'))
1653 self.assertEqual(bio.read(), b'bar')
1654 bio.write(memoryview(b'baz'))
1655 self.assertEqual(bio.read(), b'baz')
1656
1657 def test_error_types(self):
1658 bio = ssl.MemoryBIO()
1659 self.assertRaises(TypeError, bio.write, 'foo')
1660 self.assertRaises(TypeError, bio.write, None)
1661 self.assertRaises(TypeError, bio.write, True)
1662 self.assertRaises(TypeError, bio.write, 1)
1663
1664
Christian Heimes89c20512018-02-27 11:17:32 +01001665class SSLObjectTests(unittest.TestCase):
1666 def test_private_init(self):
1667 bio = ssl.MemoryBIO()
1668 with self.assertRaisesRegex(TypeError, "public constructor"):
1669 ssl.SSLObject(bio, bio)
1670
1671
Martin Panter3840b2a2016-03-27 01:53:46 +00001672class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001673 """Tests that connect to a simple server running in the background"""
1674
1675 def setUp(self):
1676 server = ThreadedEchoServer(SIGNED_CERTFILE)
1677 self.server_addr = (HOST, server.port)
1678 server.__enter__()
1679 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001680
Antoine Pitrou480a1242010-04-28 21:37:09 +00001681 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001682 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001683 cert_reqs=ssl.CERT_NONE) as s:
1684 s.connect(self.server_addr)
1685 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001686 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001687
Martin Panter3840b2a2016-03-27 01:53:46 +00001688 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001689 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001690 cert_reqs=ssl.CERT_REQUIRED,
1691 ca_certs=SIGNING_CA) as s:
1692 s.connect(self.server_addr)
1693 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001694 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001695
Martin Panter3840b2a2016-03-27 01:53:46 +00001696 def test_connect_fail(self):
1697 # This should fail because we have no verification certs. Connection
1698 # failure crashes ThreadedEchoServer, so run this in an independent
1699 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001700 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001701 cert_reqs=ssl.CERT_REQUIRED)
1702 self.addCleanup(s.close)
1703 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1704 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001705
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001706 def test_connect_ex(self):
1707 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001708 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001709 cert_reqs=ssl.CERT_REQUIRED,
1710 ca_certs=SIGNING_CA)
1711 self.addCleanup(s.close)
1712 self.assertEqual(0, s.connect_ex(self.server_addr))
1713 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001714
1715 def test_non_blocking_connect_ex(self):
1716 # Issue #11326: non-blocking connect_ex() should allow handshake
1717 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001718 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001719 cert_reqs=ssl.CERT_REQUIRED,
1720 ca_certs=SIGNING_CA,
1721 do_handshake_on_connect=False)
1722 self.addCleanup(s.close)
1723 s.setblocking(False)
1724 rc = s.connect_ex(self.server_addr)
1725 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1726 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1727 # Wait for connect to finish
1728 select.select([], [s], [], 5.0)
1729 # Non-blocking handshake
1730 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001731 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001732 s.do_handshake()
1733 break
1734 except ssl.SSLWantReadError:
1735 select.select([s], [], [], 5.0)
1736 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001737 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001738 # SSL established
1739 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001740
Antoine Pitrou152efa22010-05-16 18:19:27 +00001741 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001742 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001743 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001744 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1745 s.connect(self.server_addr)
1746 self.assertEqual({}, s.getpeercert())
1747 # Same with a server hostname
1748 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1749 server_hostname="dummy") as s:
1750 s.connect(self.server_addr)
1751 ctx.verify_mode = ssl.CERT_REQUIRED
1752 # This should succeed because we specify the root cert
1753 ctx.load_verify_locations(SIGNING_CA)
1754 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1755 s.connect(self.server_addr)
1756 cert = s.getpeercert()
1757 self.assertTrue(cert)
1758
1759 def test_connect_with_context_fail(self):
1760 # This should fail because we have no verification certs. Connection
1761 # failure crashes ThreadedEchoServer, so run this in an independent
1762 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001763 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001764 ctx.verify_mode = ssl.CERT_REQUIRED
1765 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1766 self.addCleanup(s.close)
1767 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1768 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001769
1770 def test_connect_capath(self):
1771 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001772 # NOTE: the subject hashing algorithm has been changed between
1773 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1774 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001775 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001776 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001777 ctx.verify_mode = ssl.CERT_REQUIRED
1778 ctx.load_verify_locations(capath=CAPATH)
1779 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1780 s.connect(self.server_addr)
1781 cert = s.getpeercert()
1782 self.assertTrue(cert)
1783 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001784 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001785 ctx.verify_mode = ssl.CERT_REQUIRED
1786 ctx.load_verify_locations(capath=BYTES_CAPATH)
1787 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1788 s.connect(self.server_addr)
1789 cert = s.getpeercert()
1790 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001791
Christian Heimesefff7062013-11-21 03:35:02 +01001792 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001793 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001794 pem = f.read()
1795 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001796 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001797 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001798 # TODO: fix TLSv1.3 support
1799 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001800 ctx.load_verify_locations(cadata=pem)
1801 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1802 s.connect(self.server_addr)
1803 cert = s.getpeercert()
1804 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001805
Martin Panter3840b2a2016-03-27 01:53:46 +00001806 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001807 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001808 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001809 # TODO: fix TLSv1.3 support
1810 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001811 ctx.load_verify_locations(cadata=der)
1812 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1813 s.connect(self.server_addr)
1814 cert = s.getpeercert()
1815 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001816
Antoine Pitroue3220242010-04-24 11:13:53 +00001817 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1818 def test_makefile_close(self):
1819 # Issue #5238: creating a file-like object with makefile() shouldn't
1820 # delay closing the underlying "real socket" (here tested with its
1821 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001822 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001823 ss.connect(self.server_addr)
1824 fd = ss.fileno()
1825 f = ss.makefile()
1826 f.close()
1827 # The fd is still open
1828 os.read(fd, 0)
1829 # Closing the SSL socket should close the fd too
1830 ss.close()
1831 gc.collect()
1832 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001833 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001834 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001835
Antoine Pitrou480a1242010-04-28 21:37:09 +00001836 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001837 s = socket.socket(socket.AF_INET)
1838 s.connect(self.server_addr)
1839 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001840 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001841 cert_reqs=ssl.CERT_NONE,
1842 do_handshake_on_connect=False)
1843 self.addCleanup(s.close)
1844 count = 0
1845 while True:
1846 try:
1847 count += 1
1848 s.do_handshake()
1849 break
1850 except ssl.SSLWantReadError:
1851 select.select([s], [], [])
1852 except ssl.SSLWantWriteError:
1853 select.select([], [s], [])
1854 if support.verbose:
1855 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856
Antoine Pitrou480a1242010-04-28 21:37:09 +00001857 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001858 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001859
Martin Panter3840b2a2016-03-27 01:53:46 +00001860 def test_get_server_certificate_fail(self):
1861 # Connection failure crashes ThreadedEchoServer, so run this in an
1862 # independent test method
1863 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001864
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001865 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001866 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001867 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1868 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001869 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001870 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1871 s.connect(self.server_addr)
1872 # Error checking can happen at instantiation or when connecting
1873 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1874 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001875 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001876 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1877 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001878
Christian Heimes9a5395a2013-06-17 15:44:12 +02001879 def test_get_ca_certs_capath(self):
1880 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001881 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001882 ctx.load_verify_locations(capath=CAPATH)
1883 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001884 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1885 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001886 s.connect(self.server_addr)
1887 cert = s.getpeercert()
1888 self.assertTrue(cert)
1889 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001890
Christian Heimes575596e2013-12-15 21:49:17 +01001891 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001892 def test_context_setget(self):
1893 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001894 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1895 ctx1.load_verify_locations(capath=CAPATH)
1896 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1897 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001898 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001899 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001900 ss.connect(self.server_addr)
1901 self.assertIs(ss.context, ctx1)
1902 self.assertIs(ss._sslobj.context, ctx1)
1903 ss.context = ctx2
1904 self.assertIs(ss.context, ctx2)
1905 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001906
1907 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1908 # A simple IO loop. Call func(*args) depending on the error we get
1909 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1910 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001911 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001912 count = 0
1913 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001914 if time.monotonic() > deadline:
1915 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001916 errno = None
1917 count += 1
1918 try:
1919 ret = func(*args)
1920 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001921 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001922 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001923 raise
1924 errno = e.errno
1925 # Get any data from the outgoing BIO irrespective of any error, and
1926 # send it to the socket.
1927 buf = outgoing.read()
1928 sock.sendall(buf)
1929 # If there's no error, we're done. For WANT_READ, we need to get
1930 # data from the socket and put it in the incoming BIO.
1931 if errno is None:
1932 break
1933 elif errno == ssl.SSL_ERROR_WANT_READ:
1934 buf = sock.recv(32768)
1935 if buf:
1936 incoming.write(buf)
1937 else:
1938 incoming.write_eof()
1939 if support.verbose:
1940 sys.stdout.write("Needed %d calls to complete %s().\n"
1941 % (count, func.__name__))
1942 return ret
1943
Martin Panter3840b2a2016-03-27 01:53:46 +00001944 def test_bio_handshake(self):
1945 sock = socket.socket(socket.AF_INET)
1946 self.addCleanup(sock.close)
1947 sock.connect(self.server_addr)
1948 incoming = ssl.MemoryBIO()
1949 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001950 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1951 self.assertTrue(ctx.check_hostname)
1952 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001953 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001954 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1955 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001956 self.assertIs(sslobj._sslobj.owner, sslobj)
1957 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001958 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001959 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 self.assertRaises(ValueError, sslobj.getpeercert)
1961 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1962 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1963 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1964 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001965 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001966 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001967 self.assertTrue(sslobj.getpeercert())
1968 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1969 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1970 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001971 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001972 except ssl.SSLSyscallError:
1973 # If the server shuts down the TCP connection without sending a
1974 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1975 pass
1976 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1977
1978 def test_bio_read_write_data(self):
1979 sock = socket.socket(socket.AF_INET)
1980 self.addCleanup(sock.close)
1981 sock.connect(self.server_addr)
1982 incoming = ssl.MemoryBIO()
1983 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001984 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 ctx.verify_mode = ssl.CERT_NONE
1986 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1987 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1988 req = b'FOO\n'
1989 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1990 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1991 self.assertEqual(buf, b'foo\n')
1992 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001993
1994
Martin Panter3840b2a2016-03-27 01:53:46 +00001995class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001996
Martin Panter3840b2a2016-03-27 01:53:46 +00001997 def test_timeout_connect_ex(self):
1998 # Issue #12065: on a timeout, connect_ex() should return the original
1999 # errno (mimicking the behaviour of non-SSL sockets).
2000 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002001 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002002 cert_reqs=ssl.CERT_REQUIRED,
2003 do_handshake_on_connect=False)
2004 self.addCleanup(s.close)
2005 s.settimeout(0.0000001)
2006 rc = s.connect_ex((REMOTE_HOST, 443))
2007 if rc == 0:
2008 self.skipTest("REMOTE_HOST responded too quickly")
2009 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2010
2011 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2012 def test_get_server_certificate_ipv6(self):
2013 with support.transient_internet('ipv6.google.com'):
2014 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2015 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2016
Martin Panter3840b2a2016-03-27 01:53:46 +00002017
2018def _test_get_server_certificate(test, host, port, cert=None):
2019 pem = ssl.get_server_certificate((host, port))
2020 if not pem:
2021 test.fail("No server certificate on %s:%s!" % (host, port))
2022
2023 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2024 if not pem:
2025 test.fail("No server certificate on %s:%s!" % (host, port))
2026 if support.verbose:
2027 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2028
2029def _test_get_server_certificate_fail(test, host, port):
2030 try:
2031 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2032 except ssl.SSLError as x:
2033 #should fail
2034 if support.verbose:
2035 sys.stdout.write("%s\n" % x)
2036 else:
2037 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2038
2039
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002040from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002041
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002042class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002043
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002044 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002045
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002046 """A mildly complicated class, because we want it to work both
2047 with and without the SSL wrapper around the socket connection, so
2048 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002049
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002050 def __init__(self, server, connsock, addr):
2051 self.server = server
2052 self.running = False
2053 self.sock = connsock
2054 self.addr = addr
2055 self.sock.setblocking(1)
2056 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002057 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002058 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002059
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002060 def wrap_conn(self):
2061 try:
2062 self.sslconn = self.server.context.wrap_socket(
2063 self.sock, server_side=True)
2064 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2065 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2066 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2067 # We treat ConnectionResetError as though it were an
2068 # SSLError - OpenSSL on Ubuntu abruptly closes the
2069 # connection when asked to use an unsupported protocol.
2070 #
2071 # OSError may occur with wrong protocols, e.g. both
2072 # sides use PROTOCOL_TLS_SERVER.
2073 #
2074 # XXX Various errors can have happened here, for example
2075 # a mismatching protocol version, an invalid certificate,
2076 # or a low-level bug. This should be made more discriminating.
2077 #
2078 # bpo-31323: Store the exception as string to prevent
2079 # a reference leak: server -> conn_errors -> exception
2080 # -> traceback -> self (ConnectionHandler) -> server
2081 self.server.conn_errors.append(str(e))
2082 if self.server.chatty:
2083 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2084 self.running = False
2085 self.server.stop()
2086 self.close()
2087 return False
2088 else:
2089 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2090 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2091 cert = self.sslconn.getpeercert()
2092 if support.verbose and self.server.chatty:
2093 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2094 cert_binary = self.sslconn.getpeercert(True)
2095 if support.verbose and self.server.chatty:
2096 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2097 cipher = self.sslconn.cipher()
2098 if support.verbose and self.server.chatty:
2099 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2100 sys.stdout.write(" server: selected protocol is now "
2101 + str(self.sslconn.selected_npn_protocol()) + "\n")
2102 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002103
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002104 def read(self):
2105 if self.sslconn:
2106 return self.sslconn.read()
2107 else:
2108 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002109
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002110 def write(self, bytes):
2111 if self.sslconn:
2112 return self.sslconn.write(bytes)
2113 else:
2114 return self.sock.send(bytes)
2115
2116 def close(self):
2117 if self.sslconn:
2118 self.sslconn.close()
2119 else:
2120 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002121
Antoine Pitrou480a1242010-04-28 21:37:09 +00002122 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002123 self.running = True
2124 if not self.server.starttls_server:
2125 if not self.wrap_conn():
2126 return
2127 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002128 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002129 msg = self.read()
2130 stripped = msg.strip()
2131 if not stripped:
2132 # eof, so quit this handler
2133 self.running = False
2134 try:
2135 self.sock = self.sslconn.unwrap()
2136 except OSError:
2137 # Many tests shut the TCP connection down
2138 # without an SSL shutdown. This causes
2139 # unwrap() to raise OSError with errno=0!
2140 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002141 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002142 self.sslconn = None
2143 self.close()
2144 elif stripped == b'over':
2145 if support.verbose and self.server.connectionchatty:
2146 sys.stdout.write(" server: client closed connection\n")
2147 self.close()
2148 return
2149 elif (self.server.starttls_server and
2150 stripped == b'STARTTLS'):
2151 if support.verbose and self.server.connectionchatty:
2152 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2153 self.write(b"OK\n")
2154 if not self.wrap_conn():
2155 return
2156 elif (self.server.starttls_server and self.sslconn
2157 and stripped == b'ENDTLS'):
2158 if support.verbose and self.server.connectionchatty:
2159 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2160 self.write(b"OK\n")
2161 self.sock = self.sslconn.unwrap()
2162 self.sslconn = None
2163 if support.verbose and self.server.connectionchatty:
2164 sys.stdout.write(" server: connection is now unencrypted...\n")
2165 elif stripped == b'CB tls-unique':
2166 if support.verbose and self.server.connectionchatty:
2167 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2168 data = self.sslconn.get_channel_binding("tls-unique")
2169 self.write(repr(data).encode("us-ascii") + b"\n")
2170 else:
2171 if (support.verbose and
2172 self.server.connectionchatty):
2173 ctype = (self.sslconn and "encrypted") or "unencrypted"
2174 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2175 % (msg, ctype, msg.lower(), ctype))
2176 self.write(msg.lower())
2177 except OSError:
2178 if self.server.chatty:
2179 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002180 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002181 self.running = False
2182 # normally, we'd just stop here, but for the test
2183 # harness, we want to stop the server
2184 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002185
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002186 def __init__(self, certificate=None, ssl_version=None,
2187 certreqs=None, cacerts=None,
2188 chatty=True, connectionchatty=False, starttls_server=False,
2189 npn_protocols=None, alpn_protocols=None,
2190 ciphers=None, context=None):
2191 if context:
2192 self.context = context
2193 else:
2194 self.context = ssl.SSLContext(ssl_version
2195 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002196 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002197 self.context.verify_mode = (certreqs if certreqs is not None
2198 else ssl.CERT_NONE)
2199 if cacerts:
2200 self.context.load_verify_locations(cacerts)
2201 if certificate:
2202 self.context.load_cert_chain(certificate)
2203 if npn_protocols:
2204 self.context.set_npn_protocols(npn_protocols)
2205 if alpn_protocols:
2206 self.context.set_alpn_protocols(alpn_protocols)
2207 if ciphers:
2208 self.context.set_ciphers(ciphers)
2209 self.chatty = chatty
2210 self.connectionchatty = connectionchatty
2211 self.starttls_server = starttls_server
2212 self.sock = socket.socket()
2213 self.port = support.bind_port(self.sock)
2214 self.flag = None
2215 self.active = False
2216 self.selected_npn_protocols = []
2217 self.selected_alpn_protocols = []
2218 self.shared_ciphers = []
2219 self.conn_errors = []
2220 threading.Thread.__init__(self)
2221 self.daemon = True
2222
2223 def __enter__(self):
2224 self.start(threading.Event())
2225 self.flag.wait()
2226 return self
2227
2228 def __exit__(self, *args):
2229 self.stop()
2230 self.join()
2231
2232 def start(self, flag=None):
2233 self.flag = flag
2234 threading.Thread.start(self)
2235
2236 def run(self):
2237 self.sock.settimeout(0.05)
2238 self.sock.listen()
2239 self.active = True
2240 if self.flag:
2241 # signal an event
2242 self.flag.set()
2243 while self.active:
2244 try:
2245 newconn, connaddr = self.sock.accept()
2246 if support.verbose and self.chatty:
2247 sys.stdout.write(' server: new connection from '
2248 + repr(connaddr) + '\n')
2249 handler = self.ConnectionHandler(self, newconn, connaddr)
2250 handler.start()
2251 handler.join()
2252 except socket.timeout:
2253 pass
2254 except KeyboardInterrupt:
2255 self.stop()
2256 self.sock.close()
2257
2258 def stop(self):
2259 self.active = False
2260
2261class AsyncoreEchoServer(threading.Thread):
2262
2263 # this one's based on asyncore.dispatcher
2264
2265 class EchoServer (asyncore.dispatcher):
2266
2267 class ConnectionHandler(asyncore.dispatcher_with_send):
2268
2269 def __init__(self, conn, certfile):
2270 self.socket = test_wrap_socket(conn, server_side=True,
2271 certfile=certfile,
2272 do_handshake_on_connect=False)
2273 asyncore.dispatcher_with_send.__init__(self, self.socket)
2274 self._ssl_accepting = True
2275 self._do_ssl_handshake()
2276
2277 def readable(self):
2278 if isinstance(self.socket, ssl.SSLSocket):
2279 while self.socket.pending() > 0:
2280 self.handle_read_event()
2281 return True
2282
2283 def _do_ssl_handshake(self):
2284 try:
2285 self.socket.do_handshake()
2286 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2287 return
2288 except ssl.SSLEOFError:
2289 return self.handle_close()
2290 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002291 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002292 except OSError as err:
2293 if err.args[0] == errno.ECONNABORTED:
2294 return self.handle_close()
2295 else:
2296 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002297
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002298 def handle_read(self):
2299 if self._ssl_accepting:
2300 self._do_ssl_handshake()
2301 else:
2302 data = self.recv(1024)
2303 if support.verbose:
2304 sys.stdout.write(" server: read %s from client\n" % repr(data))
2305 if not data:
2306 self.close()
2307 else:
2308 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002309
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002310 def handle_close(self):
2311 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002312 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002313 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002314
2315 def handle_error(self):
2316 raise
2317
Trent Nelson78520002008-04-10 20:54:35 +00002318 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002319 self.certfile = certfile
2320 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2321 self.port = support.bind_port(sock, '')
2322 asyncore.dispatcher.__init__(self, sock)
2323 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002324
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002325 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002326 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002327 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2328 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002329
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002330 def handle_error(self):
2331 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002332
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002333 def __init__(self, certfile):
2334 self.flag = None
2335 self.active = False
2336 self.server = self.EchoServer(certfile)
2337 self.port = self.server.port
2338 threading.Thread.__init__(self)
2339 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002340
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002341 def __str__(self):
2342 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002343
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002344 def __enter__(self):
2345 self.start(threading.Event())
2346 self.flag.wait()
2347 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002348
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002349 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002350 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002351 sys.stdout.write(" cleanup: stopping server.\n")
2352 self.stop()
2353 if support.verbose:
2354 sys.stdout.write(" cleanup: joining server thread.\n")
2355 self.join()
2356 if support.verbose:
2357 sys.stdout.write(" cleanup: successfully joined.\n")
2358 # make sure that ConnectionHandler is removed from socket_map
2359 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002360
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002361 def start (self, flag=None):
2362 self.flag = flag
2363 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002364
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002365 def run(self):
2366 self.active = True
2367 if self.flag:
2368 self.flag.set()
2369 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002370 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002371 asyncore.loop(1)
2372 except:
2373 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002374
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002375 def stop(self):
2376 self.active = False
2377 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002378
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002379def server_params_test(client_context, server_context, indata=b"FOO\n",
2380 chatty=True, connectionchatty=False, sni_name=None,
2381 session=None):
2382 """
2383 Launch a server, connect a client to it and try various reads
2384 and writes.
2385 """
2386 stats = {}
2387 server = ThreadedEchoServer(context=server_context,
2388 chatty=chatty,
2389 connectionchatty=False)
2390 with server:
2391 with client_context.wrap_socket(socket.socket(),
2392 server_hostname=sni_name, session=session) as s:
2393 s.connect((HOST, server.port))
2394 for arg in [indata, bytearray(indata), memoryview(indata)]:
2395 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002396 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002397 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002398 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002399 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002400 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002401 if connectionchatty:
2402 if support.verbose:
2403 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002404 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002405 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002406 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2407 % (outdata[:20], len(outdata),
2408 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002409 s.write(b"over\n")
2410 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002411 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002412 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002413 stats.update({
2414 'compression': s.compression(),
2415 'cipher': s.cipher(),
2416 'peercert': s.getpeercert(),
2417 'client_alpn_protocol': s.selected_alpn_protocol(),
2418 'client_npn_protocol': s.selected_npn_protocol(),
2419 'version': s.version(),
2420 'session_reused': s.session_reused,
2421 'session': s.session,
2422 })
2423 s.close()
2424 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2425 stats['server_npn_protocols'] = server.selected_npn_protocols
2426 stats['server_shared_ciphers'] = server.shared_ciphers
2427 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002428
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002429def try_protocol_combo(server_protocol, client_protocol, expect_success,
2430 certsreqs=None, server_options=0, client_options=0):
2431 """
2432 Try to SSL-connect using *client_protocol* to *server_protocol*.
2433 If *expect_success* is true, assert that the connection succeeds,
2434 if it's false, assert that the connection fails.
2435 Also, if *expect_success* is a string, assert that it is the protocol
2436 version actually used by the connection.
2437 """
2438 if certsreqs is None:
2439 certsreqs = ssl.CERT_NONE
2440 certtype = {
2441 ssl.CERT_NONE: "CERT_NONE",
2442 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2443 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2444 }[certsreqs]
2445 if support.verbose:
2446 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2447 sys.stdout.write(formatstr %
2448 (ssl.get_protocol_name(client_protocol),
2449 ssl.get_protocol_name(server_protocol),
2450 certtype))
2451 client_context = ssl.SSLContext(client_protocol)
2452 client_context.options |= client_options
2453 server_context = ssl.SSLContext(server_protocol)
2454 server_context.options |= server_options
2455
2456 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2457 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2458 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002459 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002460 client_context.set_ciphers("ALL")
2461
2462 for ctx in (client_context, server_context):
2463 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002464 ctx.load_cert_chain(SIGNED_CERTFILE)
2465 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002466 try:
2467 stats = server_params_test(client_context, server_context,
2468 chatty=False, connectionchatty=False)
2469 # Protocol mismatch can result in either an SSLError, or a
2470 # "Connection reset by peer" error.
2471 except ssl.SSLError:
2472 if expect_success:
2473 raise
2474 except OSError as e:
2475 if expect_success or e.errno != errno.ECONNRESET:
2476 raise
2477 else:
2478 if not expect_success:
2479 raise AssertionError(
2480 "Client protocol %s succeeded with server protocol %s!"
2481 % (ssl.get_protocol_name(client_protocol),
2482 ssl.get_protocol_name(server_protocol)))
2483 elif (expect_success is not True
2484 and expect_success != stats['version']):
2485 raise AssertionError("version mismatch: expected %r, got %r"
2486 % (expect_success, stats['version']))
2487
2488
2489class ThreadedTests(unittest.TestCase):
2490
2491 @skip_if_broken_ubuntu_ssl
2492 def test_echo(self):
2493 """Basic test of an SSL client connecting to a server"""
2494 if support.verbose:
2495 sys.stdout.write("\n")
2496 for protocol in PROTOCOLS:
2497 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2498 continue
2499 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2500 context = ssl.SSLContext(protocol)
2501 context.load_cert_chain(CERTFILE)
2502 server_params_test(context, context,
2503 chatty=True, connectionchatty=True)
2504
Christian Heimesa170fa12017-09-15 20:27:30 +02002505 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002506
2507 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2508 server_params_test(client_context=client_context,
2509 server_context=server_context,
2510 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002511 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002512
2513 client_context.check_hostname = False
2514 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2515 with self.assertRaises(ssl.SSLError) as e:
2516 server_params_test(client_context=server_context,
2517 server_context=client_context,
2518 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002519 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002520 self.assertIn('called a function you should not call',
2521 str(e.exception))
2522
2523 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2524 with self.assertRaises(ssl.SSLError) as e:
2525 server_params_test(client_context=server_context,
2526 server_context=server_context,
2527 chatty=True, connectionchatty=True)
2528 self.assertIn('called a function you should not call',
2529 str(e.exception))
2530
2531 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2532 with self.assertRaises(ssl.SSLError) as e:
2533 server_params_test(client_context=server_context,
2534 server_context=client_context,
2535 chatty=True, connectionchatty=True)
2536 self.assertIn('called a function you should not call',
2537 str(e.exception))
2538
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002539 def test_getpeercert(self):
2540 if support.verbose:
2541 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002542
2543 client_context, server_context, hostname = testing_context()
2544 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002545 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002546 with client_context.wrap_socket(socket.socket(),
2547 do_handshake_on_connect=False,
2548 server_hostname=hostname) as s:
2549 s.connect((HOST, server.port))
2550 # getpeercert() raise ValueError while the handshake isn't
2551 # done.
2552 with self.assertRaises(ValueError):
2553 s.getpeercert()
2554 s.do_handshake()
2555 cert = s.getpeercert()
2556 self.assertTrue(cert, "Can't get peer certificate.")
2557 cipher = s.cipher()
2558 if support.verbose:
2559 sys.stdout.write(pprint.pformat(cert) + '\n')
2560 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2561 if 'subject' not in cert:
2562 self.fail("No subject field in certificate: %s." %
2563 pprint.pformat(cert))
2564 if ((('organizationName', 'Python Software Foundation'),)
2565 not in cert['subject']):
2566 self.fail(
2567 "Missing or invalid 'organizationName' field in certificate subject; "
2568 "should be 'Python Software Foundation'.")
2569 self.assertIn('notBefore', cert)
2570 self.assertIn('notAfter', cert)
2571 before = ssl.cert_time_to_seconds(cert['notBefore'])
2572 after = ssl.cert_time_to_seconds(cert['notAfter'])
2573 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002574
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002575 @unittest.skipUnless(have_verify_flags(),
2576 "verify_flags need OpenSSL > 0.9.8")
2577 def test_crl_check(self):
2578 if support.verbose:
2579 sys.stdout.write("\n")
2580
Christian Heimesa170fa12017-09-15 20:27:30 +02002581 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002582
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002583 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002584 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002585
2586 # VERIFY_DEFAULT should pass
2587 server = ThreadedEchoServer(context=server_context, chatty=True)
2588 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002589 with client_context.wrap_socket(socket.socket(),
2590 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002591 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002592 cert = s.getpeercert()
2593 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002594
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002595 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002596 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002597
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002598 server = ThreadedEchoServer(context=server_context, chatty=True)
2599 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002600 with client_context.wrap_socket(socket.socket(),
2601 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602 with self.assertRaisesRegex(ssl.SSLError,
2603 "certificate verify failed"):
2604 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002605
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002606 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002607 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002608
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002609 server = ThreadedEchoServer(context=server_context, chatty=True)
2610 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002611 with client_context.wrap_socket(socket.socket(),
2612 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002613 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002614 cert = s.getpeercert()
2615 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002616
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002617 def test_check_hostname(self):
2618 if support.verbose:
2619 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002620
Christian Heimesa170fa12017-09-15 20:27:30 +02002621 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002622
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002623 # correct hostname should verify
2624 server = ThreadedEchoServer(context=server_context, chatty=True)
2625 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002626 with client_context.wrap_socket(socket.socket(),
2627 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002628 s.connect((HOST, server.port))
2629 cert = s.getpeercert()
2630 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002631
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002632 # incorrect hostname should raise an exception
2633 server = ThreadedEchoServer(context=server_context, chatty=True)
2634 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002635 with client_context.wrap_socket(socket.socket(),
2636 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002637 with self.assertRaisesRegex(
2638 ssl.CertificateError,
2639 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002641
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002642 # missing server_hostname arg should cause an exception, too
2643 server = ThreadedEchoServer(context=server_context, chatty=True)
2644 with server:
2645 with socket.socket() as s:
2646 with self.assertRaisesRegex(ValueError,
2647 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002648 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002649
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002650 def test_ecc_cert(self):
2651 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2652 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002653 client_context.set_ciphers(
2654 'TLS13-AES-128-GCM-SHA256:TLS13-CHACHA20-POLY1305-SHA256:'
2655 'ECDHE:ECDSA:!NULL:!aRSA'
2656 )
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002657 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2658
2659 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2660 # load ECC cert
2661 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2662
2663 # correct hostname should verify
2664 server = ThreadedEchoServer(context=server_context, chatty=True)
2665 with server:
2666 with client_context.wrap_socket(socket.socket(),
2667 server_hostname=hostname) as s:
2668 s.connect((HOST, server.port))
2669 cert = s.getpeercert()
2670 self.assertTrue(cert, "Can't get peer certificate.")
2671 cipher = s.cipher()[0].split('-')
2672 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2673
2674 def test_dual_rsa_ecc(self):
2675 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2676 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002677 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2678 # algorithms.
2679 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002680 # only ECDSA certs
2681 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2682 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2683
2684 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2685 # load ECC and RSA key/cert pairs
2686 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2687 server_context.load_cert_chain(SIGNED_CERTFILE)
2688
2689 # correct hostname should verify
2690 server = ThreadedEchoServer(context=server_context, chatty=True)
2691 with server:
2692 with client_context.wrap_socket(socket.socket(),
2693 server_hostname=hostname) as s:
2694 s.connect((HOST, server.port))
2695 cert = s.getpeercert()
2696 self.assertTrue(cert, "Can't get peer certificate.")
2697 cipher = s.cipher()[0].split('-')
2698 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2699
Christian Heimes66e57422018-01-29 14:25:13 +01002700 def test_check_hostname_idn(self):
2701 if support.verbose:
2702 sys.stdout.write("\n")
2703
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002704 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002705 server_context.load_cert_chain(IDNSANSFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002706 # TODO: fix TLSv1.3 support
2707 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimes66e57422018-01-29 14:25:13 +01002708
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002709 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002710 context.verify_mode = ssl.CERT_REQUIRED
2711 context.check_hostname = True
2712 context.load_verify_locations(SIGNING_CA)
2713
2714 # correct hostname should verify, when specified in several
2715 # different ways
2716 idn_hostnames = [
2717 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002718 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002719 ('xn--knig-5qa.idn.pythontest.net',
2720 'xn--knig-5qa.idn.pythontest.net'),
2721 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002722 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002723
2724 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002725 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002726 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2727 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2728 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002729 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2730
2731 # ('königsgäßchen.idna2008.pythontest.net',
2732 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2733 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2734 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2735 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2736 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2737
Christian Heimes66e57422018-01-29 14:25:13 +01002738 ]
2739 for server_hostname, expected_hostname in idn_hostnames:
2740 server = ThreadedEchoServer(context=server_context, chatty=True)
2741 with server:
2742 with context.wrap_socket(socket.socket(),
2743 server_hostname=server_hostname) as s:
2744 self.assertEqual(s.server_hostname, expected_hostname)
2745 s.connect((HOST, server.port))
2746 cert = s.getpeercert()
2747 self.assertEqual(s.server_hostname, expected_hostname)
2748 self.assertTrue(cert, "Can't get peer certificate.")
2749
Christian Heimes66e57422018-01-29 14:25:13 +01002750 # incorrect hostname should raise an exception
2751 server = ThreadedEchoServer(context=server_context, chatty=True)
2752 with server:
2753 with context.wrap_socket(socket.socket(),
2754 server_hostname="python.example.org") as s:
2755 with self.assertRaises(ssl.CertificateError):
2756 s.connect((HOST, server.port))
2757
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002758 def test_wrong_cert(self):
2759 """Connecting when the server rejects the client's certificate
2760
2761 Launch a server with CERT_REQUIRED, and check that trying to
2762 connect to it with a wrong client certificate fails.
2763 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002764 client_context, server_context, hostname = testing_context()
2765 # load client cert
2766 client_context.load_cert_chain(WRONG_CERT)
2767 # require TLS client authentication
2768 server_context.verify_mode = ssl.CERT_REQUIRED
2769 # TODO: fix TLSv1.3 support
2770 # With TLS 1.3, test fails with exception in server thread
2771 server_context.options |= ssl.OP_NO_TLSv1_3
2772
2773 server = ThreadedEchoServer(
2774 context=server_context, chatty=True, connectionchatty=True,
2775 )
2776
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002777 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002778 client_context.wrap_socket(socket.socket(),
2779 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002780 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002781 # Expect either an SSL error about the server rejecting
2782 # the connection, or a low-level connection reset (which
2783 # sometimes happens on Windows)
2784 s.connect((HOST, server.port))
2785 except ssl.SSLError as e:
2786 if support.verbose:
2787 sys.stdout.write("\nSSLError is %r\n" % e)
2788 except OSError as e:
2789 if e.errno != errno.ECONNRESET:
2790 raise
2791 if support.verbose:
2792 sys.stdout.write("\nsocket.error is %r\n" % e)
2793 else:
2794 self.fail("Use of invalid cert should have failed!")
2795
2796 def test_rude_shutdown(self):
2797 """A brutal shutdown of an SSL server should raise an OSError
2798 in the client when attempting handshake.
2799 """
2800 listener_ready = threading.Event()
2801 listener_gone = threading.Event()
2802
2803 s = socket.socket()
2804 port = support.bind_port(s, HOST)
2805
2806 # `listener` runs in a thread. It sits in an accept() until
2807 # the main thread connects. Then it rudely closes the socket,
2808 # and sets Event `listener_gone` to let the main thread know
2809 # the socket is gone.
2810 def listener():
2811 s.listen()
2812 listener_ready.set()
2813 newsock, addr = s.accept()
2814 newsock.close()
2815 s.close()
2816 listener_gone.set()
2817
2818 def connector():
2819 listener_ready.wait()
2820 with socket.socket() as c:
2821 c.connect((HOST, port))
2822 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002823 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002824 ssl_sock = test_wrap_socket(c)
2825 except OSError:
2826 pass
2827 else:
2828 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002829
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002830 t = threading.Thread(target=listener)
2831 t.start()
2832 try:
2833 connector()
2834 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002835 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002836
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002837 def test_ssl_cert_verify_error(self):
2838 if support.verbose:
2839 sys.stdout.write("\n")
2840
2841 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2842 server_context.load_cert_chain(SIGNED_CERTFILE)
2843
2844 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2845
2846 server = ThreadedEchoServer(context=server_context, chatty=True)
2847 with server:
2848 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002849 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002850 try:
2851 s.connect((HOST, server.port))
2852 except ssl.SSLError as e:
2853 msg = 'unable to get local issuer certificate'
2854 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2855 self.assertEqual(e.verify_code, 20)
2856 self.assertEqual(e.verify_message, msg)
2857 self.assertIn(msg, repr(e))
2858 self.assertIn('certificate verify failed', repr(e))
2859
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002860 @skip_if_broken_ubuntu_ssl
2861 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2862 "OpenSSL is compiled without SSLv2 support")
2863 def test_protocol_sslv2(self):
2864 """Connecting to an SSLv2 server with various client options"""
2865 if support.verbose:
2866 sys.stdout.write("\n")
2867 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2868 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2869 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002870 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002871 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2872 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2873 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2874 # SSLv23 client with specific SSL options
2875 if no_sslv2_implies_sslv3_hello():
2876 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002877 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002878 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002879 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002880 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002881 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002882 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002883
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002884 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002885 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002886 """Connecting to an SSLv23 server with various client options"""
2887 if support.verbose:
2888 sys.stdout.write("\n")
2889 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002890 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002891 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002892 except OSError as x:
2893 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2894 if support.verbose:
2895 sys.stdout.write(
2896 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2897 % str(x))
2898 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002899 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2900 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2901 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002902
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002903 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002904 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2905 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2906 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002907
2908 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002909 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2910 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2911 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002912
2913 # Server with specific SSL options
2914 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002915 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002916 server_options=ssl.OP_NO_SSLv3)
2917 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002918 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002919 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002920 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002921 server_options=ssl.OP_NO_TLSv1)
2922
2923
2924 @skip_if_broken_ubuntu_ssl
2925 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2926 "OpenSSL is compiled without SSLv3 support")
2927 def test_protocol_sslv3(self):
2928 """Connecting to an SSLv3 server with various client options"""
2929 if support.verbose:
2930 sys.stdout.write("\n")
2931 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2932 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2933 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2934 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2935 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002936 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 client_options=ssl.OP_NO_SSLv3)
2938 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2939 if no_sslv2_implies_sslv3_hello():
2940 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002941 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002942 False, client_options=ssl.OP_NO_SSLv2)
2943
2944 @skip_if_broken_ubuntu_ssl
2945 def test_protocol_tlsv1(self):
2946 """Connecting to a TLSv1 server with various client options"""
2947 if support.verbose:
2948 sys.stdout.write("\n")
2949 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2950 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2951 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2952 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2953 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2954 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2955 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002956 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002957 client_options=ssl.OP_NO_TLSv1)
2958
2959 @skip_if_broken_ubuntu_ssl
2960 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2961 "TLS version 1.1 not supported.")
2962 def test_protocol_tlsv1_1(self):
2963 """Connecting to a TLSv1.1 server with various client options.
2964 Testing against older TLS versions."""
2965 if support.verbose:
2966 sys.stdout.write("\n")
2967 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2968 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2969 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2970 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2971 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002972 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 client_options=ssl.OP_NO_TLSv1_1)
2974
Christian Heimesa170fa12017-09-15 20:27:30 +02002975 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002976 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2977 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2978
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002979 @skip_if_broken_ubuntu_ssl
2980 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2981 "TLS version 1.2 not supported.")
2982 def test_protocol_tlsv1_2(self):
2983 """Connecting to a TLSv1.2 server with various client options.
2984 Testing against older TLS versions."""
2985 if support.verbose:
2986 sys.stdout.write("\n")
2987 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2988 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2989 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2990 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2991 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2992 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2993 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002994 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002995 client_options=ssl.OP_NO_TLSv1_2)
2996
Christian Heimesa170fa12017-09-15 20:27:30 +02002997 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002998 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2999 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3000 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3001 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3002
3003 def test_starttls(self):
3004 """Switching from clear text to encrypted and back again."""
3005 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3006
3007 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003008 starttls_server=True,
3009 chatty=True,
3010 connectionchatty=True)
3011 wrapped = False
3012 with server:
3013 s = socket.socket()
3014 s.setblocking(1)
3015 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003016 if support.verbose:
3017 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003018 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003019 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003020 sys.stdout.write(
3021 " client: sending %r...\n" % indata)
3022 if wrapped:
3023 conn.write(indata)
3024 outdata = conn.read()
3025 else:
3026 s.send(indata)
3027 outdata = s.recv(1024)
3028 msg = outdata.strip().lower()
3029 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3030 # STARTTLS ok, switch to secure mode
3031 if support.verbose:
3032 sys.stdout.write(
3033 " client: read %r from server, starting TLS...\n"
3034 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003035 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003036 wrapped = True
3037 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3038 # ENDTLS ok, switch back to clear text
3039 if support.verbose:
3040 sys.stdout.write(
3041 " client: read %r from server, ending TLS...\n"
3042 % msg)
3043 s = conn.unwrap()
3044 wrapped = False
3045 else:
3046 if support.verbose:
3047 sys.stdout.write(
3048 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003049 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003050 sys.stdout.write(" client: closing connection.\n")
3051 if wrapped:
3052 conn.write(b"over\n")
3053 else:
3054 s.send(b"over\n")
3055 if wrapped:
3056 conn.close()
3057 else:
3058 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003059
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003060 def test_socketserver(self):
3061 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003062 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003063 # try to connect
3064 if support.verbose:
3065 sys.stdout.write('\n')
3066 with open(CERTFILE, 'rb') as f:
3067 d1 = f.read()
3068 d2 = ''
3069 # now fetch the same data from the HTTPS server
3070 url = 'https://localhost:%d/%s' % (
3071 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003072 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003073 f = urllib.request.urlopen(url, context=context)
3074 try:
3075 dlen = f.info().get("content-length")
3076 if dlen and (int(dlen) > 0):
3077 d2 = f.read(int(dlen))
3078 if support.verbose:
3079 sys.stdout.write(
3080 " client: read %d bytes from remote server '%s'\n"
3081 % (len(d2), server))
3082 finally:
3083 f.close()
3084 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 def test_asyncore_server(self):
3087 """Check the example asyncore integration."""
3088 if support.verbose:
3089 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003090
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003091 indata = b"FOO\n"
3092 server = AsyncoreEchoServer(CERTFILE)
3093 with server:
3094 s = test_wrap_socket(socket.socket())
3095 s.connect(('127.0.0.1', server.port))
3096 if support.verbose:
3097 sys.stdout.write(
3098 " client: sending %r...\n" % indata)
3099 s.write(indata)
3100 outdata = s.read()
3101 if support.verbose:
3102 sys.stdout.write(" client: read %r\n" % outdata)
3103 if outdata != indata.lower():
3104 self.fail(
3105 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3106 % (outdata[:20], len(outdata),
3107 indata[:20].lower(), len(indata)))
3108 s.write(b"over\n")
3109 if support.verbose:
3110 sys.stdout.write(" client: closing connection.\n")
3111 s.close()
3112 if support.verbose:
3113 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003114
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003115 def test_recv_send(self):
3116 """Test recv(), send() and friends."""
3117 if support.verbose:
3118 sys.stdout.write("\n")
3119
3120 server = ThreadedEchoServer(CERTFILE,
3121 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003122 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123 cacerts=CERTFILE,
3124 chatty=True,
3125 connectionchatty=False)
3126 with server:
3127 s = test_wrap_socket(socket.socket(),
3128 server_side=False,
3129 certfile=CERTFILE,
3130 ca_certs=CERTFILE,
3131 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003132 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003133 s.connect((HOST, server.port))
3134 # helper methods for standardising recv* method signatures
3135 def _recv_into():
3136 b = bytearray(b"\0"*100)
3137 count = s.recv_into(b)
3138 return b[:count]
3139
3140 def _recvfrom_into():
3141 b = bytearray(b"\0"*100)
3142 count, addr = s.recvfrom_into(b)
3143 return b[:count]
3144
3145 # (name, method, expect success?, *args, return value func)
3146 send_methods = [
3147 ('send', s.send, True, [], len),
3148 ('sendto', s.sendto, False, ["some.address"], len),
3149 ('sendall', s.sendall, True, [], lambda x: None),
3150 ]
3151 # (name, method, whether to expect success, *args)
3152 recv_methods = [
3153 ('recv', s.recv, True, []),
3154 ('recvfrom', s.recvfrom, False, ["some.address"]),
3155 ('recv_into', _recv_into, True, []),
3156 ('recvfrom_into', _recvfrom_into, False, []),
3157 ]
3158 data_prefix = "PREFIX_"
3159
3160 for (meth_name, send_meth, expect_success, args,
3161 ret_val_meth) in send_methods:
3162 indata = (data_prefix + meth_name).encode('ascii')
3163 try:
3164 ret = send_meth(indata, *args)
3165 msg = "sending with {}".format(meth_name)
3166 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3167 outdata = s.read()
3168 if outdata != indata.lower():
3169 self.fail(
3170 "While sending with <<{name:s}>> bad data "
3171 "<<{outdata:r}>> ({nout:d}) received; "
3172 "expected <<{indata:r}>> ({nin:d})\n".format(
3173 name=meth_name, outdata=outdata[:20],
3174 nout=len(outdata),
3175 indata=indata[:20], nin=len(indata)
3176 )
3177 )
3178 except ValueError as e:
3179 if expect_success:
3180 self.fail(
3181 "Failed to send with method <<{name:s}>>; "
3182 "expected to succeed.\n".format(name=meth_name)
3183 )
3184 if not str(e).startswith(meth_name):
3185 self.fail(
3186 "Method <<{name:s}>> failed with unexpected "
3187 "exception message: {exp:s}\n".format(
3188 name=meth_name, exp=e
3189 )
3190 )
3191
3192 for meth_name, recv_meth, expect_success, args in recv_methods:
3193 indata = (data_prefix + meth_name).encode('ascii')
3194 try:
3195 s.send(indata)
3196 outdata = recv_meth(*args)
3197 if outdata != indata.lower():
3198 self.fail(
3199 "While receiving with <<{name:s}>> bad data "
3200 "<<{outdata:r}>> ({nout:d}) received; "
3201 "expected <<{indata:r}>> ({nin:d})\n".format(
3202 name=meth_name, outdata=outdata[:20],
3203 nout=len(outdata),
3204 indata=indata[:20], nin=len(indata)
3205 )
3206 )
3207 except ValueError as e:
3208 if expect_success:
3209 self.fail(
3210 "Failed to receive with method <<{name:s}>>; "
3211 "expected to succeed.\n".format(name=meth_name)
3212 )
3213 if not str(e).startswith(meth_name):
3214 self.fail(
3215 "Method <<{name:s}>> failed with unexpected "
3216 "exception message: {exp:s}\n".format(
3217 name=meth_name, exp=e
3218 )
3219 )
3220 # consume data
3221 s.read()
3222
3223 # read(-1, buffer) is supported, even though read(-1) is not
3224 data = b"data"
3225 s.send(data)
3226 buffer = bytearray(len(data))
3227 self.assertEqual(s.read(-1, buffer), len(data))
3228 self.assertEqual(buffer, data)
3229
Christian Heimes888bbdc2017-09-07 14:18:21 -07003230 # sendall accepts bytes-like objects
3231 if ctypes is not None:
3232 ubyte = ctypes.c_ubyte * len(data)
3233 byteslike = ubyte.from_buffer_copy(data)
3234 s.sendall(byteslike)
3235 self.assertEqual(s.read(), data)
3236
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003237 # Make sure sendmsg et al are disallowed to avoid
3238 # inadvertent disclosure of data and/or corruption
3239 # of the encrypted data stream
3240 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3241 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3242 self.assertRaises(NotImplementedError,
3243 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003244 s.write(b"over\n")
3245
3246 self.assertRaises(ValueError, s.recv, -1)
3247 self.assertRaises(ValueError, s.read, -1)
3248
3249 s.close()
3250
3251 def test_recv_zero(self):
3252 server = ThreadedEchoServer(CERTFILE)
3253 server.__enter__()
3254 self.addCleanup(server.__exit__, None, None)
3255 s = socket.create_connection((HOST, server.port))
3256 self.addCleanup(s.close)
3257 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3258 self.addCleanup(s.close)
3259
3260 # recv/read(0) should return no data
3261 s.send(b"data")
3262 self.assertEqual(s.recv(0), b"")
3263 self.assertEqual(s.read(0), b"")
3264 self.assertEqual(s.read(), b"data")
3265
3266 # Should not block if the other end sends no data
3267 s.setblocking(False)
3268 self.assertEqual(s.recv(0), b"")
3269 self.assertEqual(s.recv_into(bytearray()), 0)
3270
3271 def test_nonblocking_send(self):
3272 server = ThreadedEchoServer(CERTFILE,
3273 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003274 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003275 cacerts=CERTFILE,
3276 chatty=True,
3277 connectionchatty=False)
3278 with server:
3279 s = test_wrap_socket(socket.socket(),
3280 server_side=False,
3281 certfile=CERTFILE,
3282 ca_certs=CERTFILE,
3283 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003284 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003285 s.connect((HOST, server.port))
3286 s.setblocking(False)
3287
3288 # If we keep sending data, at some point the buffers
3289 # will be full and the call will block
3290 buf = bytearray(8192)
3291 def fill_buffer():
3292 while True:
3293 s.send(buf)
3294 self.assertRaises((ssl.SSLWantWriteError,
3295 ssl.SSLWantReadError), fill_buffer)
3296
3297 # Now read all the output and discard it
3298 s.setblocking(True)
3299 s.close()
3300
3301 def test_handshake_timeout(self):
3302 # Issue #5103: SSL handshake must respect the socket timeout
3303 server = socket.socket(socket.AF_INET)
3304 host = "127.0.0.1"
3305 port = support.bind_port(server)
3306 started = threading.Event()
3307 finish = False
3308
3309 def serve():
3310 server.listen()
3311 started.set()
3312 conns = []
3313 while not finish:
3314 r, w, e = select.select([server], [], [], 0.1)
3315 if server in r:
3316 # Let the socket hang around rather than having
3317 # it closed by garbage collection.
3318 conns.append(server.accept()[0])
3319 for sock in conns:
3320 sock.close()
3321
3322 t = threading.Thread(target=serve)
3323 t.start()
3324 started.wait()
3325
3326 try:
3327 try:
3328 c = socket.socket(socket.AF_INET)
3329 c.settimeout(0.2)
3330 c.connect((host, port))
3331 # Will attempt handshake and time out
3332 self.assertRaisesRegex(socket.timeout, "timed out",
3333 test_wrap_socket, c)
3334 finally:
3335 c.close()
3336 try:
3337 c = socket.socket(socket.AF_INET)
3338 c = test_wrap_socket(c)
3339 c.settimeout(0.2)
3340 # Will attempt handshake and time out
3341 self.assertRaisesRegex(socket.timeout, "timed out",
3342 c.connect, (host, port))
3343 finally:
3344 c.close()
3345 finally:
3346 finish = True
3347 t.join()
3348 server.close()
3349
3350 def test_server_accept(self):
3351 # Issue #16357: accept() on a SSLSocket created through
3352 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003353 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003354 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003355 context.load_verify_locations(SIGNING_CA)
3356 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003357 server = socket.socket(socket.AF_INET)
3358 host = "127.0.0.1"
3359 port = support.bind_port(server)
3360 server = context.wrap_socket(server, server_side=True)
3361 self.assertTrue(server.server_side)
3362
3363 evt = threading.Event()
3364 remote = None
3365 peer = None
3366 def serve():
3367 nonlocal remote, peer
3368 server.listen()
3369 # Block on the accept and wait on the connection to close.
3370 evt.set()
3371 remote, peer = server.accept()
3372 remote.recv(1)
3373
3374 t = threading.Thread(target=serve)
3375 t.start()
3376 # Client wait until server setup and perform a connect.
3377 evt.wait()
3378 client = context.wrap_socket(socket.socket())
3379 client.connect((host, port))
3380 client_addr = client.getsockname()
3381 client.close()
3382 t.join()
3383 remote.close()
3384 server.close()
3385 # Sanity checks.
3386 self.assertIsInstance(remote, ssl.SSLSocket)
3387 self.assertEqual(peer, client_addr)
3388
3389 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003390 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003391 with context.wrap_socket(socket.socket()) as sock:
3392 with self.assertRaises(OSError) as cm:
3393 sock.getpeercert()
3394 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3395
3396 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003397 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003398 with context.wrap_socket(socket.socket()) as sock:
3399 with self.assertRaises(OSError) as cm:
3400 sock.do_handshake()
3401 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3402
3403 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003404 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003405 try:
3406 # Force a set of weak ciphers on our client context
3407 context.set_ciphers("DES")
3408 except ssl.SSLError:
3409 self.skipTest("no DES cipher available")
3410 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003411 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003412 chatty=False) as server:
3413 with context.wrap_socket(socket.socket()) as s:
3414 with self.assertRaises(OSError):
3415 s.connect((HOST, server.port))
3416 self.assertIn("no shared cipher", server.conn_errors[0])
3417
3418 def test_version_basic(self):
3419 """
3420 Basic tests for SSLSocket.version().
3421 More tests are done in the test_protocol_*() methods.
3422 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003423 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3424 context.check_hostname = False
3425 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003426 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003427 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003428 chatty=False) as server:
3429 with context.wrap_socket(socket.socket()) as s:
3430 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003431 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003432 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003433 if ssl.OPENSSL_VERSION_INFO >= (1, 1, 1):
3434 self.assertEqual(s.version(), 'TLSv1.3')
3435 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003436 self.assertEqual(s.version(), 'TLSv1.2')
3437 else: # 0.9.8 to 1.0.1
3438 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003439 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003440 self.assertIs(s.version(), None)
3441
Christian Heimescb5b68a2017-09-07 18:07:00 -07003442 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3443 "test requires TLSv1.3 enabled OpenSSL")
3444 def test_tls1_3(self):
3445 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3446 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003447 context.options |= (
3448 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3449 )
3450 with ThreadedEchoServer(context=context) as server:
3451 with context.wrap_socket(socket.socket()) as s:
3452 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003453 self.assertIn(s.cipher()[0], {
Christian Heimescb5b68a2017-09-07 18:07:00 -07003454 'TLS13-AES-256-GCM-SHA384',
3455 'TLS13-CHACHA20-POLY1305-SHA256',
3456 'TLS13-AES-128-GCM-SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003457 })
3458 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003459
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003460 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3461 def test_default_ecdh_curve(self):
3462 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3463 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003464 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003465 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003466 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3467 # cipher name.
3468 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003469 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3470 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3471 # our default cipher list should prefer ECDH-based ciphers
3472 # automatically.
3473 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3474 context.set_ciphers("ECCdraft:ECDH")
3475 with ThreadedEchoServer(context=context) as server:
3476 with context.wrap_socket(socket.socket()) as s:
3477 s.connect((HOST, server.port))
3478 self.assertIn("ECDH", s.cipher()[0])
3479
3480 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3481 "'tls-unique' channel binding not available")
3482 def test_tls_unique_channel_binding(self):
3483 """Test tls-unique channel binding."""
3484 if support.verbose:
3485 sys.stdout.write("\n")
3486
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003487 client_context, server_context, hostname = testing_context()
3488 # TODO: fix TLSv1.3 support
3489 client_context.options |= ssl.OP_NO_TLSv1_3
3490
3491 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003492 chatty=True,
3493 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003494
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003495 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003496 with client_context.wrap_socket(
3497 socket.socket(),
3498 server_hostname=hostname) as s:
3499 s.connect((HOST, server.port))
3500 # get the data
3501 cb_data = s.get_channel_binding("tls-unique")
3502 if support.verbose:
3503 sys.stdout.write(
3504 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003505
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003506 # check if it is sane
3507 self.assertIsNotNone(cb_data)
3508 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003509
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003510 # and compare with the peers version
3511 s.write(b"CB tls-unique\n")
3512 peer_data_repr = s.read().strip()
3513 self.assertEqual(peer_data_repr,
3514 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003515
3516 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003517 with client_context.wrap_socket(
3518 socket.socket(),
3519 server_hostname=hostname) as s:
3520 s.connect((HOST, server.port))
3521 new_cb_data = s.get_channel_binding("tls-unique")
3522 if support.verbose:
3523 sys.stdout.write(
3524 "got another channel binding data: {0!r}\n".format(
3525 new_cb_data)
3526 )
3527 # is it really unique
3528 self.assertNotEqual(cb_data, new_cb_data)
3529 self.assertIsNotNone(cb_data)
3530 self.assertEqual(len(cb_data), 12) # True for TLSv1
3531 s.write(b"CB tls-unique\n")
3532 peer_data_repr = s.read().strip()
3533 self.assertEqual(peer_data_repr,
3534 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003535
3536 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003537 client_context, server_context, hostname = testing_context()
3538 stats = server_params_test(client_context, server_context,
3539 chatty=True, connectionchatty=True,
3540 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003541 if support.verbose:
3542 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3543 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3544
3545 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3546 "ssl.OP_NO_COMPRESSION needed for this test")
3547 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003548 client_context, server_context, hostname = testing_context()
3549 client_context.options |= ssl.OP_NO_COMPRESSION
3550 server_context.options |= ssl.OP_NO_COMPRESSION
3551 stats = server_params_test(client_context, server_context,
3552 chatty=True, connectionchatty=True,
3553 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003554 self.assertIs(stats['compression'], None)
3555
3556 def test_dh_params(self):
3557 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003558 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003559 # test scenario needs TLS <= 1.2
3560 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003561 server_context.load_dh_params(DHFILE)
3562 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003563 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003564 stats = server_params_test(client_context, server_context,
3565 chatty=True, connectionchatty=True,
3566 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003567 cipher = stats["cipher"][0]
3568 parts = cipher.split("-")
3569 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3570 self.fail("Non-DH cipher: " + cipher[0])
3571
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003572 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003573 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003574 def test_ecdh_curve(self):
3575 # server secp384r1, client auto
3576 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003577
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003578 server_context.set_ecdh_curve("secp384r1")
3579 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3580 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3581 stats = server_params_test(client_context, server_context,
3582 chatty=True, connectionchatty=True,
3583 sni_name=hostname)
3584
3585 # server auto, client secp384r1
3586 client_context, server_context, hostname = testing_context()
3587 client_context.set_ecdh_curve("secp384r1")
3588 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3589 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3590 stats = server_params_test(client_context, server_context,
3591 chatty=True, connectionchatty=True,
3592 sni_name=hostname)
3593
3594 # server / client curve mismatch
3595 client_context, server_context, hostname = testing_context()
3596 client_context.set_ecdh_curve("prime256v1")
3597 server_context.set_ecdh_curve("secp384r1")
3598 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3599 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3600 try:
3601 stats = server_params_test(client_context, server_context,
3602 chatty=True, connectionchatty=True,
3603 sni_name=hostname)
3604 except ssl.SSLError:
3605 pass
3606 else:
3607 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003608 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003609 self.fail("mismatch curve did not fail")
3610
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003611 def test_selected_alpn_protocol(self):
3612 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003613 client_context, server_context, hostname = testing_context()
3614 stats = server_params_test(client_context, server_context,
3615 chatty=True, connectionchatty=True,
3616 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003617 self.assertIs(stats['client_alpn_protocol'], None)
3618
3619 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3620 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3621 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003622 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003623 server_context.set_alpn_protocols(['foo', 'bar'])
3624 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003625 chatty=True, connectionchatty=True,
3626 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003627 self.assertIs(stats['client_alpn_protocol'], None)
3628
3629 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3630 def test_alpn_protocols(self):
3631 server_protocols = ['foo', 'bar', 'milkshake']
3632 protocol_tests = [
3633 (['foo', 'bar'], 'foo'),
3634 (['bar', 'foo'], 'foo'),
3635 (['milkshake'], 'milkshake'),
3636 (['http/3.0', 'http/4.0'], None)
3637 ]
3638 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003639 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003640 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003641 client_context.set_alpn_protocols(client_protocols)
3642
3643 try:
3644 stats = server_params_test(client_context,
3645 server_context,
3646 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003647 connectionchatty=True,
3648 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003649 except ssl.SSLError as e:
3650 stats = e
3651
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003652 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003653 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3654 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3655 self.assertIsInstance(stats, ssl.SSLError)
3656 else:
3657 msg = "failed trying %s (s) and %s (c).\n" \
3658 "was expecting %s, but got %%s from the %%s" \
3659 % (str(server_protocols), str(client_protocols),
3660 str(expected))
3661 client_result = stats['client_alpn_protocol']
3662 self.assertEqual(client_result, expected,
3663 msg % (client_result, "client"))
3664 server_result = stats['server_alpn_protocols'][-1] \
3665 if len(stats['server_alpn_protocols']) else 'nothing'
3666 self.assertEqual(server_result, expected,
3667 msg % (server_result, "server"))
3668
3669 def test_selected_npn_protocol(self):
3670 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003671 client_context, server_context, hostname = testing_context()
3672 stats = server_params_test(client_context, server_context,
3673 chatty=True, connectionchatty=True,
3674 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003675 self.assertIs(stats['client_npn_protocol'], None)
3676
3677 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3678 def test_npn_protocols(self):
3679 server_protocols = ['http/1.1', 'spdy/2']
3680 protocol_tests = [
3681 (['http/1.1', 'spdy/2'], 'http/1.1'),
3682 (['spdy/2', 'http/1.1'], 'http/1.1'),
3683 (['spdy/2', 'test'], 'spdy/2'),
3684 (['abc', 'def'], 'abc')
3685 ]
3686 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003687 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003688 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003689 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003690 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003691 chatty=True, connectionchatty=True,
3692 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003693 msg = "failed trying %s (s) and %s (c).\n" \
3694 "was expecting %s, but got %%s from the %%s" \
3695 % (str(server_protocols), str(client_protocols),
3696 str(expected))
3697 client_result = stats['client_npn_protocol']
3698 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3699 server_result = stats['server_npn_protocols'][-1] \
3700 if len(stats['server_npn_protocols']) else 'nothing'
3701 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3702
3703 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003704 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003705 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003706 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003707 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003708 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003709 client_context.load_verify_locations(SIGNING_CA)
3710 return server_context, other_context, client_context
3711
3712 def check_common_name(self, stats, name):
3713 cert = stats['peercert']
3714 self.assertIn((('commonName', name),), cert['subject'])
3715
3716 @needs_sni
3717 def test_sni_callback(self):
3718 calls = []
3719 server_context, other_context, client_context = self.sni_contexts()
3720
Christian Heimesa170fa12017-09-15 20:27:30 +02003721 client_context.check_hostname = False
3722
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003723 def servername_cb(ssl_sock, server_name, initial_context):
3724 calls.append((server_name, initial_context))
3725 if server_name is not None:
3726 ssl_sock.context = other_context
3727 server_context.set_servername_callback(servername_cb)
3728
3729 stats = server_params_test(client_context, server_context,
3730 chatty=True,
3731 sni_name='supermessage')
3732 # The hostname was fetched properly, and the certificate was
3733 # changed for the connection.
3734 self.assertEqual(calls, [("supermessage", server_context)])
3735 # CERTFILE4 was selected
3736 self.check_common_name(stats, 'fakehostname')
3737
3738 calls = []
3739 # The callback is called with server_name=None
3740 stats = server_params_test(client_context, server_context,
3741 chatty=True,
3742 sni_name=None)
3743 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003744 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003745
3746 # Check disabling the callback
3747 calls = []
3748 server_context.set_servername_callback(None)
3749
3750 stats = server_params_test(client_context, server_context,
3751 chatty=True,
3752 sni_name='notfunny')
3753 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003754 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003755 self.assertEqual(calls, [])
3756
3757 @needs_sni
3758 def test_sni_callback_alert(self):
3759 # Returning a TLS alert is reflected to the connecting client
3760 server_context, other_context, client_context = self.sni_contexts()
3761
3762 def cb_returning_alert(ssl_sock, server_name, initial_context):
3763 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3764 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003765 with self.assertRaises(ssl.SSLError) as cm:
3766 stats = server_params_test(client_context, server_context,
3767 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003768 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003770
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003771 @needs_sni
3772 def test_sni_callback_raising(self):
3773 # Raising fails the connection with a TLS handshake failure alert.
3774 server_context, other_context, client_context = self.sni_contexts()
3775
3776 def cb_raising(ssl_sock, server_name, initial_context):
3777 1/0
3778 server_context.set_servername_callback(cb_raising)
3779
3780 with self.assertRaises(ssl.SSLError) as cm, \
3781 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003782 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003783 chatty=False,
3784 sni_name='supermessage')
3785 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3786 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003787
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003788 @needs_sni
3789 def test_sni_callback_wrong_return_type(self):
3790 # Returning the wrong return type terminates the TLS connection
3791 # with an internal error alert.
3792 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003793
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003794 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3795 return "foo"
3796 server_context.set_servername_callback(cb_wrong_return_type)
3797
3798 with self.assertRaises(ssl.SSLError) as cm, \
3799 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003800 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003801 chatty=False,
3802 sni_name='supermessage')
3803 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3804 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003805
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003806 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003807 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003808 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3809 client_context.set_ciphers("AES128:AES256")
3810 server_context.set_ciphers("AES256")
3811 alg1 = "AES256"
3812 alg2 = "AES-256"
3813 else:
3814 client_context.set_ciphers("AES:3DES")
3815 server_context.set_ciphers("3DES")
3816 alg1 = "3DES"
3817 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003818
Christian Heimesa170fa12017-09-15 20:27:30 +02003819 stats = server_params_test(client_context, server_context,
3820 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003821 ciphers = stats['server_shared_ciphers'][0]
3822 self.assertGreater(len(ciphers), 0)
3823 for name, tls_version, bits in ciphers:
3824 if not alg1 in name.split("-") and alg2 not in name:
3825 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003826
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003827 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003828 client_context, server_context, hostname = testing_context()
3829 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003830
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003831 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003832 s = client_context.wrap_socket(socket.socket(),
3833 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003834 s.connect((HOST, server.port))
3835 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003836
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003837 self.assertRaises(ValueError, s.read, 1024)
3838 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003839
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003840 def test_sendfile(self):
3841 TEST_DATA = b"x" * 512
3842 with open(support.TESTFN, 'wb') as f:
3843 f.write(TEST_DATA)
3844 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003845 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003846 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003847 context.load_verify_locations(SIGNING_CA)
3848 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003849 server = ThreadedEchoServer(context=context, chatty=False)
3850 with server:
3851 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003852 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003853 with open(support.TESTFN, 'rb') as file:
3854 s.sendfile(file)
3855 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003856
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003857 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003858 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003859 # TODO: sessions aren't compatible with TLSv1.3 yet
3860 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003861
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003862 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003863 stats = server_params_test(client_context, server_context,
3864 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003865 session = stats['session']
3866 self.assertTrue(session.id)
3867 self.assertGreater(session.time, 0)
3868 self.assertGreater(session.timeout, 0)
3869 self.assertTrue(session.has_ticket)
3870 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3871 self.assertGreater(session.ticket_lifetime_hint, 0)
3872 self.assertFalse(stats['session_reused'])
3873 sess_stat = server_context.session_stats()
3874 self.assertEqual(sess_stat['accept'], 1)
3875 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003876
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003877 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003878 stats = server_params_test(client_context, server_context,
3879 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003880 sess_stat = server_context.session_stats()
3881 self.assertEqual(sess_stat['accept'], 2)
3882 self.assertEqual(sess_stat['hits'], 1)
3883 self.assertTrue(stats['session_reused'])
3884 session2 = stats['session']
3885 self.assertEqual(session2.id, session.id)
3886 self.assertEqual(session2, session)
3887 self.assertIsNot(session2, session)
3888 self.assertGreaterEqual(session2.time, session.time)
3889 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003890
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003891 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003892 stats = server_params_test(client_context, server_context,
3893 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003894 self.assertFalse(stats['session_reused'])
3895 session3 = stats['session']
3896 self.assertNotEqual(session3.id, session.id)
3897 self.assertNotEqual(session3, session)
3898 sess_stat = server_context.session_stats()
3899 self.assertEqual(sess_stat['accept'], 3)
3900 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003901
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003902 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003903 stats = server_params_test(client_context, server_context,
3904 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003905 self.assertTrue(stats['session_reused'])
3906 session4 = stats['session']
3907 self.assertEqual(session4.id, session.id)
3908 self.assertEqual(session4, session)
3909 self.assertGreaterEqual(session4.time, session.time)
3910 self.assertGreaterEqual(session4.timeout, session.timeout)
3911 sess_stat = server_context.session_stats()
3912 self.assertEqual(sess_stat['accept'], 4)
3913 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003914
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003915 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003916 client_context, server_context, hostname = testing_context()
3917 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02003918
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003919 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02003920 client_context.options |= ssl.OP_NO_TLSv1_3
3921 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07003922
Christian Heimesa170fa12017-09-15 20:27:30 +02003923 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003924 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003925 with client_context.wrap_socket(socket.socket(),
3926 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 # session is None before handshake
3928 self.assertEqual(s.session, None)
3929 self.assertEqual(s.session_reused, None)
3930 s.connect((HOST, server.port))
3931 session = s.session
3932 self.assertTrue(session)
3933 with self.assertRaises(TypeError) as e:
3934 s.session = object
3935 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003936
Christian Heimesa170fa12017-09-15 20:27:30 +02003937 with client_context.wrap_socket(socket.socket(),
3938 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939 s.connect((HOST, server.port))
3940 # cannot set session after handshake
3941 with self.assertRaises(ValueError) as e:
3942 s.session = session
3943 self.assertEqual(str(e.exception),
3944 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003945
Christian Heimesa170fa12017-09-15 20:27:30 +02003946 with client_context.wrap_socket(socket.socket(),
3947 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003948 # can set session before handshake and before the
3949 # connection was established
3950 s.session = session
3951 s.connect((HOST, server.port))
3952 self.assertEqual(s.session.id, session.id)
3953 self.assertEqual(s.session, session)
3954 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003955
Christian Heimesa170fa12017-09-15 20:27:30 +02003956 with client_context2.wrap_socket(socket.socket(),
3957 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003958 # cannot re-use session with a different SSLContext
3959 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003960 s.session = session
3961 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003962 self.assertEqual(str(e.exception),
3963 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003964
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003965
Thomas Woutersed03b412007-08-28 21:37:11 +00003966def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003967 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003968 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003969 plats = {
3970 'Linux': platform.linux_distribution,
3971 'Mac': platform.mac_ver,
3972 'Windows': platform.win32_ver,
3973 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003974 with warnings.catch_warnings():
3975 warnings.filterwarnings(
3976 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003977 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003978 'functions are deprecated .*',
3979 PendingDeprecationWarning,
3980 )
3981 for name, func in plats.items():
3982 plat = func()
3983 if plat and plat[0]:
3984 plat = '%s %r' % (name, plat)
3985 break
3986 else:
3987 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003988 print("test_ssl: testing with %r %r" %
3989 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3990 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003991 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003992 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3993 try:
3994 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3995 except AttributeError:
3996 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003997
Antoine Pitrou152efa22010-05-16 18:19:27 +00003998 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003999 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004000 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004001 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004002 BADCERT, BADKEY, EMPTYCERT]:
4003 if not os.path.exists(filename):
4004 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004005
Martin Panter3840b2a2016-03-27 01:53:46 +00004006 tests = [
4007 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004008 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004009 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004010
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004011 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004012 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004013
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004014 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004015 try:
4016 support.run_unittest(*tests)
4017 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004018 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004019
4020if __name__ == "__main__":
4021 test_main()