blob: 9017ad57115609f66b808582f3c1e78cfb2af5da [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080058WRONG_CERT = data_file("wrongcert.pem")
Christian Heimesefff7062013-11-21 03:35:02 +010059
Christian Heimesbd5c7d22018-01-20 15:16:30 +010060CERTFILE_INFO = {
61 'issuer': ((('countryName', 'XY'),),
62 (('localityName', 'Castle Anthrax'),),
63 (('organizationName', 'Python Software Foundation'),),
64 (('commonName', 'localhost'),)),
65 'notAfter': 'Jan 17 19:09:06 2028 GMT',
66 'notBefore': 'Jan 19 19:09:06 2018 GMT',
67 'serialNumber': 'F9BA076D5B6ABD9B',
68 'subject': ((('countryName', 'XY'),),
69 (('localityName', 'Castle Anthrax'),),
70 (('organizationName', 'Python Software Foundation'),),
71 (('commonName', 'localhost'),)),
72 'subjectAltName': (('DNS', 'localhost'),),
73 'version': 3
74}
Antoine Pitrou152efa22010-05-16 18:19:27 +000075
Christian Heimes22587792013-11-21 23:56:13 +010076# empty CRL
77CRLFILE = data_file("revocation.crl")
78
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010079# Two keys and certs signed by the same CA (for SNI tests)
80SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020081SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010082
83SIGNED_CERTFILE_INFO = {
84 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
85 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
86 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
87 'issuer': ((('countryName', 'XY'),),
88 (('organizationName', 'Python Software Foundation CA'),),
89 (('commonName', 'our-ca-server'),)),
90 'notAfter': 'Nov 28 19:09:06 2027 GMT',
91 'notBefore': 'Jan 19 19:09:06 2018 GMT',
92 'serialNumber': '82EDBF41C880919C',
93 'subject': ((('countryName', 'XY'),),
94 (('localityName', 'Castle Anthrax'),),
95 (('organizationName', 'Python Software Foundation'),),
96 (('commonName', 'localhost'),)),
97 'subjectAltName': (('DNS', 'localhost'),),
98 'version': 3
99}
100
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100101SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200102SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100103SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
104SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
105
Martin Panter3840b2a2016-03-27 01:53:46 +0000106# Same certificate as pycacert.pem, but without extra text in file
107SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200108# cert with all kinds of subject alt names
109ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100110IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100111
Martin Panter3d81d932016-01-14 09:36:00 +0000112REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000113
114EMPTYCERT = data_file("nullcert.pem")
115BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000116NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000117BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200118NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200119NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000120
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400121DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100122BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Christian Heimes358cfd42016-09-10 22:43:48 +0200124# Not defined in all versions of OpenSSL
125OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
126OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
127OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
128OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800129OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200130
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100131
Thomas Woutersed03b412007-08-28 21:37:11 +0000132def handle_error(prefix):
133 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000134 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000135 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitroub5218772010-05-21 09:56:06 +0000137def can_clear_options():
138 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200139 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000140
141def no_sslv2_implies_sslv3_hello():
142 # 0.9.7h or higher
143 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
144
Christian Heimes2427b502013-11-23 11:24:32 +0100145def have_verify_flags():
146 # 0.9.8 or higher
147 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
148
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800149def _have_secp_curves():
150 if not ssl.HAS_ECDH:
151 return False
152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
153 try:
154 ctx.set_ecdh_curve("secp384r1")
155 except ValueError:
156 return False
157 else:
158 return True
159
160
161HAVE_SECP_CURVES = _have_secp_curves()
162
163
Antoine Pitrouc695c952014-04-28 20:57:36 +0200164def utc_offset(): #NOTE: ignore issues like #1647654
165 # local time = utc time + utc offset
166 if time.daylight and time.localtime().tm_isdst > 0:
167 return -time.altzone # seconds
168 return -time.timezone
169
Christian Heimes9424bb42013-06-17 15:32:57 +0200170def asn1time(cert_time):
171 # Some versions of OpenSSL ignore seconds, see #18207
172 # 0.9.8.i
173 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
174 fmt = "%b %d %H:%M:%S %Y GMT"
175 dt = datetime.datetime.strptime(cert_time, fmt)
176 dt = dt.replace(second=0)
177 cert_time = dt.strftime(fmt)
178 # %d adds leading zero but ASN1_TIME_print() uses leading space
179 if cert_time[4] == "0":
180 cert_time = cert_time[:4] + " " + cert_time[5:]
181
182 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
185def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200186 if hasattr(ssl, 'PROTOCOL_SSLv2'):
187 @functools.wraps(func)
188 def f(*args, **kwargs):
189 try:
190 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
191 except ssl.SSLError:
192 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
193 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
194 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
195 return func(*args, **kwargs)
196 return f
197 else:
198 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000199
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100200needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
201
Antoine Pitrou23df4832010-08-04 17:14:06 +0000202
Christian Heimesd0486372016-09-10 23:23:33 +0200203def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
204 cert_reqs=ssl.CERT_NONE, ca_certs=None,
205 ciphers=None, certfile=None, keyfile=None,
206 **kwargs):
207 context = ssl.SSLContext(ssl_version)
208 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200209 if cert_reqs == ssl.CERT_NONE:
210 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200211 context.verify_mode = cert_reqs
212 if ca_certs is not None:
213 context.load_verify_locations(ca_certs)
214 if certfile is not None or keyfile is not None:
215 context.load_cert_chain(certfile, keyfile)
216 if ciphers is not None:
217 context.set_ciphers(ciphers)
218 return context.wrap_socket(sock, **kwargs)
219
Christian Heimesa170fa12017-09-15 20:27:30 +0200220
221def testing_context(server_cert=SIGNED_CERTFILE):
222 """Create context
223
224 client_context, server_context, hostname = testing_context()
225 """
226 if server_cert == SIGNED_CERTFILE:
227 hostname = SIGNED_CERTFILE_HOSTNAME
228 elif server_cert == SIGNED_CERTFILE2:
229 hostname = SIGNED_CERTFILE2_HOSTNAME
230 else:
231 raise ValueError(server_cert)
232
233 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
234 client_context.load_verify_locations(SIGNING_CA)
235
236 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
237 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800238 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200239
240 return client_context, server_context, hostname
241
242
Antoine Pitrou152efa22010-05-16 18:19:27 +0000243class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000244
Antoine Pitrou480a1242010-04-28 21:37:09 +0000245 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000246 ssl.CERT_NONE
247 ssl.CERT_OPTIONAL
248 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100249 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100250 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100251 if ssl.HAS_ECDH:
252 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100253 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
254 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000255 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100256 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700257 ssl.OP_NO_SSLv2
258 ssl.OP_NO_SSLv3
259 ssl.OP_NO_TLSv1
260 ssl.OP_NO_TLSv1_3
261 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
262 ssl.OP_NO_TLSv1_1
263 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200264 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000265
Christian Heimes89c20512018-02-27 11:17:32 +0100266 def test_private_init(self):
267 with self.assertRaisesRegex(TypeError, "public constructor"):
268 with socket.socket() as s:
269 ssl.SSLSocket(s)
270
Antoine Pitrou172f0252014-04-18 20:33:08 +0200271 def test_str_for_enums(self):
272 # Make sure that the PROTOCOL_* constants have enum-like string
273 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200274 proto = ssl.PROTOCOL_TLS
275 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200276 ctx = ssl.SSLContext(proto)
277 self.assertIs(ctx.protocol, proto)
278
Antoine Pitrou480a1242010-04-28 21:37:09 +0000279 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000281 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000282 sys.stdout.write("\n RAND_status is %d (%s)\n"
283 % (v, (v and "sufficient randomness") or
284 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200285
286 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
287 self.assertEqual(len(data), 16)
288 self.assertEqual(is_cryptographic, v == 1)
289 if v:
290 data = ssl.RAND_bytes(16)
291 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200292 else:
293 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200294
Victor Stinner1e81a392013-12-19 16:47:04 +0100295 # negative num is invalid
296 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
297 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
298
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100299 if hasattr(ssl, 'RAND_egd'):
300 self.assertRaises(TypeError, ssl.RAND_egd, 1)
301 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000302 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200303 ssl.RAND_add(b"this is a random bytes object", 75.0)
304 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000305
Christian Heimesf77b4b22013-08-21 13:26:05 +0200306 @unittest.skipUnless(os.name == 'posix', 'requires posix')
307 def test_random_fork(self):
308 status = ssl.RAND_status()
309 if not status:
310 self.fail("OpenSSL's PRNG has insufficient randomness")
311
312 rfd, wfd = os.pipe()
313 pid = os.fork()
314 if pid == 0:
315 try:
316 os.close(rfd)
317 child_random = ssl.RAND_pseudo_bytes(16)[0]
318 self.assertEqual(len(child_random), 16)
319 os.write(wfd, child_random)
320 os.close(wfd)
321 except BaseException:
322 os._exit(1)
323 else:
324 os._exit(0)
325 else:
326 os.close(wfd)
327 self.addCleanup(os.close, rfd)
328 _, status = os.waitpid(pid, 0)
329 self.assertEqual(status, 0)
330
331 child_random = os.read(rfd, 16)
332 self.assertEqual(len(child_random), 16)
333 parent_random = ssl.RAND_pseudo_bytes(16)[0]
334 self.assertEqual(len(parent_random), 16)
335
336 self.assertNotEqual(child_random, parent_random)
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000339 # note that this uses an 'unofficial' function in _ssl.c,
340 # provided solely for this test, to exercise the certificate
341 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100342 self.assertEqual(
343 ssl._ssl._test_decode_cert(CERTFILE),
344 CERTFILE_INFO
345 )
346 self.assertEqual(
347 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
348 SIGNED_CERTFILE_INFO
349 )
350
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200351 # Issue #13034: the subjectAltName in some certificates
352 # (notably projects.developer.nokia.com:443) wasn't parsed
353 p = ssl._ssl._test_decode_cert(NOKIACERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(p['subjectAltName'],
357 (('DNS', 'projects.developer.nokia.com'),
358 ('DNS', 'projects.forum.nokia.com'))
359 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100360 # extra OCSP and AIA fields
361 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
362 self.assertEqual(p['caIssuers'],
363 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
364 self.assertEqual(p['crlDistributionPoints'],
365 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000366
Christian Heimes824f7f32013-08-17 00:54:47 +0200367 def test_parse_cert_CVE_2013_4238(self):
368 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
369 if support.verbose:
370 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
371 subject = ((('countryName', 'US'),),
372 (('stateOrProvinceName', 'Oregon'),),
373 (('localityName', 'Beaverton'),),
374 (('organizationName', 'Python Software Foundation'),),
375 (('organizationalUnitName', 'Python Core Development'),),
376 (('commonName', 'null.python.org\x00example.org'),),
377 (('emailAddress', 'python-dev@python.org'),))
378 self.assertEqual(p['subject'], subject)
379 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200380 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
381 san = (('DNS', 'altnull.python.org\x00example.com'),
382 ('email', 'null@python.org\x00user@example.org'),
383 ('URI', 'http://null.python.org\x00http://example.org'),
384 ('IP Address', '192.0.2.1'),
385 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
386 else:
387 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
388 san = (('DNS', 'altnull.python.org\x00example.com'),
389 ('email', 'null@python.org\x00user@example.org'),
390 ('URI', 'http://null.python.org\x00http://example.org'),
391 ('IP Address', '192.0.2.1'),
392 ('IP Address', '<invalid>'))
393
394 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200395
Christian Heimes1c03abd2016-09-06 23:25:35 +0200396 def test_parse_all_sans(self):
397 p = ssl._ssl._test_decode_cert(ALLSANFILE)
398 self.assertEqual(p['subjectAltName'],
399 (
400 ('DNS', 'allsans'),
401 ('othername', '<unsupported>'),
402 ('othername', '<unsupported>'),
403 ('email', 'user@example.org'),
404 ('DNS', 'www.example.org'),
405 ('DirName',
406 ((('countryName', 'XY'),),
407 (('localityName', 'Castle Anthrax'),),
408 (('organizationName', 'Python Software Foundation'),),
409 (('commonName', 'dirname example'),))),
410 ('URI', 'https://www.python.org/'),
411 ('IP Address', '127.0.0.1'),
412 ('IP Address', '0:0:0:0:0:0:0:1\n'),
413 ('Registered ID', '1.2.3.4.5')
414 )
415 )
416
Antoine Pitrou480a1242010-04-28 21:37:09 +0000417 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000418 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000419 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000420 d1 = ssl.PEM_cert_to_DER_cert(pem)
421 p2 = ssl.DER_cert_to_PEM_cert(d1)
422 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000423 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000424 if not p2.startswith(ssl.PEM_HEADER + '\n'):
425 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
426 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
427 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000428
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000429 def test_openssl_version(self):
430 n = ssl.OPENSSL_VERSION_NUMBER
431 t = ssl.OPENSSL_VERSION_INFO
432 s = ssl.OPENSSL_VERSION
433 self.assertIsInstance(n, int)
434 self.assertIsInstance(t, tuple)
435 self.assertIsInstance(s, str)
436 # Some sanity checks follow
437 # >= 0.9
438 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400439 # < 3.0
440 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000441 major, minor, fix, patch, status = t
442 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400443 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000444 self.assertGreaterEqual(minor, 0)
445 self.assertLess(minor, 256)
446 self.assertGreaterEqual(fix, 0)
447 self.assertLess(fix, 256)
448 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100449 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 self.assertGreaterEqual(status, 0)
451 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400452 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200453 if IS_LIBRESSL:
454 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100455 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400456 else:
457 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100458 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000459
Antoine Pitrou9d543662010-04-23 23:10:32 +0000460 @support.cpython_only
461 def test_refcycle(self):
462 # Issue #7943: an SSL object doesn't create reference cycles with
463 # itself.
464 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200465 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000466 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100467 with support.check_warnings(("", ResourceWarning)):
468 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100469 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000470
Antoine Pitroua468adc2010-09-14 14:43:44 +0000471 def test_wrapped_unconnected(self):
472 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200473 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000474 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200475 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100476 self.assertRaises(OSError, ss.recv, 1)
477 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
478 self.assertRaises(OSError, ss.recvfrom, 1)
479 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
480 self.assertRaises(OSError, ss.send, b'x')
481 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800482 self.assertRaises(NotImplementedError, ss.sendmsg,
483 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000484
Antoine Pitrou40f08742010-04-24 22:04:40 +0000485 def test_timeout(self):
486 # Issue #8524: when creating an SSL socket, the timeout of the
487 # original socket should be retained.
488 for timeout in (None, 0.0, 5.0):
489 s = socket.socket(socket.AF_INET)
490 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200491 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100492 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000493
Christian Heimesd0486372016-09-10 23:23:33 +0200494 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000495 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000496 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000497 "certfile must be specified",
498 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000499 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000500 "certfile must be specified for server-side operations",
501 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000502 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000503 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200504 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100505 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
506 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200507 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200508 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000509 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000510 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000511 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200512 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000513 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000514 ssl.wrap_socket(sock,
515 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000516 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200517 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000518 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000519 ssl.wrap_socket(sock,
520 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000521 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000522
Martin Panter3464ea22016-02-01 21:58:11 +0000523 def bad_cert_test(self, certfile):
524 """Check that trying to use the given client certificate fails"""
525 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
526 certfile)
527 sock = socket.socket()
528 self.addCleanup(sock.close)
529 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200530 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200531 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000532
533 def test_empty_cert(self):
534 """Wrapping with an empty cert file"""
535 self.bad_cert_test("nullcert.pem")
536
537 def test_malformed_cert(self):
538 """Wrapping with a badly formatted certificate (syntax error)"""
539 self.bad_cert_test("badcert.pem")
540
541 def test_malformed_key(self):
542 """Wrapping with a badly formatted key (syntax error)"""
543 self.bad_cert_test("badkey.pem")
544
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 def test_match_hostname(self):
546 def ok(cert, hostname):
547 ssl.match_hostname(cert, hostname)
548 def fail(cert, hostname):
549 self.assertRaises(ssl.CertificateError,
550 ssl.match_hostname, cert, hostname)
551
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100552 # -- Hostname matching --
553
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000554 cert = {'subject': ((('commonName', 'example.com'),),)}
555 ok(cert, 'example.com')
556 ok(cert, 'ExAmple.cOm')
557 fail(cert, 'www.example.com')
558 fail(cert, '.example.com')
559 fail(cert, 'example.org')
560 fail(cert, 'exampleXcom')
561
562 cert = {'subject': ((('commonName', '*.a.com'),),)}
563 ok(cert, 'foo.a.com')
564 fail(cert, 'bar.foo.a.com')
565 fail(cert, 'a.com')
566 fail(cert, 'Xa.com')
567 fail(cert, '.a.com')
568
Mandeep Singhede2ac92017-11-27 04:01:27 +0530569 # only match wildcards when they are the only thing
570 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000571 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530572 fail(cert, 'foo.com')
573 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000574 fail(cert, 'bar.com')
575 fail(cert, 'foo.a.com')
576 fail(cert, 'bar.foo.com')
577
Christian Heimes824f7f32013-08-17 00:54:47 +0200578 # NULL bytes are bad, CVE-2013-4073
579 cert = {'subject': ((('commonName',
580 'null.python.org\x00example.org'),),)}
581 ok(cert, 'null.python.org\x00example.org') # or raise an error?
582 fail(cert, 'example.org')
583 fail(cert, 'null.python.org')
584
Georg Brandl72c98d32013-10-27 07:16:53 +0100585 # error cases with wildcards
586 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
587 fail(cert, 'bar.foo.a.com')
588 fail(cert, 'a.com')
589 fail(cert, 'Xa.com')
590 fail(cert, '.a.com')
591
592 cert = {'subject': ((('commonName', 'a.*.com'),),)}
593 fail(cert, 'a.foo.com')
594 fail(cert, 'a..com')
595 fail(cert, 'a.com')
596
597 # wildcard doesn't match IDNA prefix 'xn--'
598 idna = 'püthon.python.org'.encode("idna").decode("ascii")
599 cert = {'subject': ((('commonName', idna),),)}
600 ok(cert, idna)
601 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
602 fail(cert, idna)
603 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
604 fail(cert, idna)
605
606 # wildcard in first fragment and IDNA A-labels in sequent fragments
607 # are supported.
608 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
609 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530610 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
611 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100612 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
613 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
614
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000615 # Slightly fake real-world example
616 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
617 'subject': ((('commonName', 'linuxfrz.org'),),),
618 'subjectAltName': (('DNS', 'linuxfr.org'),
619 ('DNS', 'linuxfr.com'),
620 ('othername', '<unsupported>'))}
621 ok(cert, 'linuxfr.org')
622 ok(cert, 'linuxfr.com')
623 # Not a "DNS" entry
624 fail(cert, '<unsupported>')
625 # When there is a subjectAltName, commonName isn't used
626 fail(cert, 'linuxfrz.org')
627
628 # A pristine real-world example
629 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
630 'subject': ((('countryName', 'US'),),
631 (('stateOrProvinceName', 'California'),),
632 (('localityName', 'Mountain View'),),
633 (('organizationName', 'Google Inc'),),
634 (('commonName', 'mail.google.com'),))}
635 ok(cert, 'mail.google.com')
636 fail(cert, 'gmail.com')
637 # Only commonName is considered
638 fail(cert, 'California')
639
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100640 # -- IPv4 matching --
641 cert = {'subject': ((('commonName', 'example.com'),),),
642 'subjectAltName': (('DNS', 'example.com'),
643 ('IP Address', '10.11.12.13'),
644 ('IP Address', '14.15.16.17'))}
645 ok(cert, '10.11.12.13')
646 ok(cert, '14.15.16.17')
647 fail(cert, '14.15.16.18')
648 fail(cert, 'example.net')
649
650 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800651 if hasattr(socket, 'AF_INET6'):
652 cert = {'subject': ((('commonName', 'example.com'),),),
653 'subjectAltName': (
654 ('DNS', 'example.com'),
655 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
656 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
657 ok(cert, '2001::cafe')
658 ok(cert, '2003::baba')
659 fail(cert, '2003::bebe')
660 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100661
662 # -- Miscellaneous --
663
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000664 # Neither commonName nor subjectAltName
665 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
666 'subject': ((('countryName', 'US'),),
667 (('stateOrProvinceName', 'California'),),
668 (('localityName', 'Mountain View'),),
669 (('organizationName', 'Google Inc'),))}
670 fail(cert, 'mail.google.com')
671
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200672 # No DNS entry in subjectAltName but a commonName
673 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
674 'subject': ((('countryName', 'US'),),
675 (('stateOrProvinceName', 'California'),),
676 (('localityName', 'Mountain View'),),
677 (('commonName', 'mail.google.com'),)),
678 'subjectAltName': (('othername', 'blabla'), )}
679 ok(cert, 'mail.google.com')
680
681 # No DNS entry subjectAltName and no commonName
682 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
683 'subject': ((('countryName', 'US'),),
684 (('stateOrProvinceName', 'California'),),
685 (('localityName', 'Mountain View'),),
686 (('organizationName', 'Google Inc'),)),
687 'subjectAltName': (('othername', 'blabla'),)}
688 fail(cert, 'google.com')
689
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000690 # Empty cert / no cert
691 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
692 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
693
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200694 # Issue #17980: avoid denials of service by refusing more than one
695 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800696 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
697 with self.assertRaisesRegex(
698 ssl.CertificateError,
699 "partial wildcards in leftmost label are not supported"):
700 ssl.match_hostname(cert, 'axxb.example.com')
701
702 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
703 with self.assertRaisesRegex(
704 ssl.CertificateError,
705 "wildcard can only be present in the leftmost label"):
706 ssl.match_hostname(cert, 'www.sub.example.com')
707
708 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
709 with self.assertRaisesRegex(
710 ssl.CertificateError,
711 "too many wildcards"):
712 ssl.match_hostname(cert, 'axxbxxc.example.com')
713
714 cert = {'subject': ((('commonName', '*'),),)}
715 with self.assertRaisesRegex(
716 ssl.CertificateError,
717 "sole wildcard without additional labels are not support"):
718 ssl.match_hostname(cert, 'host')
719
720 cert = {'subject': ((('commonName', '*.com'),),)}
721 with self.assertRaisesRegex(
722 ssl.CertificateError,
723 r"hostname 'com' doesn't match '\*.com'"):
724 ssl.match_hostname(cert, 'com')
725
726 # extra checks for _inet_paton()
727 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
728 with self.assertRaises(ValueError):
729 ssl._inet_paton(invalid)
730 for ipaddr in ['127.0.0.1', '192.168.0.1']:
731 self.assertTrue(ssl._inet_paton(ipaddr))
732 if hasattr(socket, 'AF_INET6'):
733 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
734 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200735
Antoine Pitroud5323212010-10-22 18:19:07 +0000736 def test_server_side(self):
737 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200738 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000739 with socket.socket() as sock:
740 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
741 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000742
Antoine Pitroud6494802011-07-21 01:11:30 +0200743 def test_unknown_channel_binding(self):
744 # should raise ValueError for unknown type
745 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200746 s.bind(('127.0.0.1', 0))
747 s.listen()
748 c = socket.socket(socket.AF_INET)
749 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200750 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100751 with self.assertRaises(ValueError):
752 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200753 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200754
755 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
756 "'tls-unique' channel binding not available")
757 def test_tls_unique_channel_binding(self):
758 # unconnected should return None for known type
759 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200760 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100761 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200762 # the same for server-side
763 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200764 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100765 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200766
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600767 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200768 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600769 r = repr(ss)
770 with self.assertWarns(ResourceWarning) as cm:
771 ss = None
772 support.gc_collect()
773 self.assertIn(r, str(cm.warning.args[0]))
774
Christian Heimes6d7ad132013-06-09 18:02:55 +0200775 def test_get_default_verify_paths(self):
776 paths = ssl.get_default_verify_paths()
777 self.assertEqual(len(paths), 6)
778 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
779
780 with support.EnvironmentVarGuard() as env:
781 env["SSL_CERT_DIR"] = CAPATH
782 env["SSL_CERT_FILE"] = CERTFILE
783 paths = ssl.get_default_verify_paths()
784 self.assertEqual(paths.cafile, CERTFILE)
785 self.assertEqual(paths.capath, CAPATH)
786
Christian Heimes44109d72013-11-22 01:51:30 +0100787 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
788 def test_enum_certificates(self):
789 self.assertTrue(ssl.enum_certificates("CA"))
790 self.assertTrue(ssl.enum_certificates("ROOT"))
791
792 self.assertRaises(TypeError, ssl.enum_certificates)
793 self.assertRaises(WindowsError, ssl.enum_certificates, "")
794
Christian Heimesc2d65e12013-11-22 16:13:55 +0100795 trust_oids = set()
796 for storename in ("CA", "ROOT"):
797 store = ssl.enum_certificates(storename)
798 self.assertIsInstance(store, list)
799 for element in store:
800 self.assertIsInstance(element, tuple)
801 self.assertEqual(len(element), 3)
802 cert, enc, trust = element
803 self.assertIsInstance(cert, bytes)
804 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
805 self.assertIsInstance(trust, (set, bool))
806 if isinstance(trust, set):
807 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100808
809 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100810 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200811
Christian Heimes46bebee2013-06-09 19:03:31 +0200812 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100813 def test_enum_crls(self):
814 self.assertTrue(ssl.enum_crls("CA"))
815 self.assertRaises(TypeError, ssl.enum_crls)
816 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200817
Christian Heimes44109d72013-11-22 01:51:30 +0100818 crls = ssl.enum_crls("CA")
819 self.assertIsInstance(crls, list)
820 for element in crls:
821 self.assertIsInstance(element, tuple)
822 self.assertEqual(len(element), 2)
823 self.assertIsInstance(element[0], bytes)
824 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200825
Christian Heimes46bebee2013-06-09 19:03:31 +0200826
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100827 def test_asn1object(self):
828 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
829 '1.3.6.1.5.5.7.3.1')
830
831 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
832 self.assertEqual(val, expected)
833 self.assertEqual(val.nid, 129)
834 self.assertEqual(val.shortname, 'serverAuth')
835 self.assertEqual(val.longname, 'TLS Web Server Authentication')
836 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
837 self.assertIsInstance(val, ssl._ASN1Object)
838 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
839
840 val = ssl._ASN1Object.fromnid(129)
841 self.assertEqual(val, expected)
842 self.assertIsInstance(val, ssl._ASN1Object)
843 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100844 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
845 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100846 for i in range(1000):
847 try:
848 obj = ssl._ASN1Object.fromnid(i)
849 except ValueError:
850 pass
851 else:
852 self.assertIsInstance(obj.nid, int)
853 self.assertIsInstance(obj.shortname, str)
854 self.assertIsInstance(obj.longname, str)
855 self.assertIsInstance(obj.oid, (str, type(None)))
856
857 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
858 self.assertEqual(val, expected)
859 self.assertIsInstance(val, ssl._ASN1Object)
860 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
861 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
862 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100863 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
864 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100865
Christian Heimes72d28502013-11-23 13:56:58 +0100866 def test_purpose_enum(self):
867 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
868 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
869 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
872 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
873 '1.3.6.1.5.5.7.3.1')
874
875 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
876 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
877 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
880 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
881 '1.3.6.1.5.5.7.3.2')
882
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100883 def test_unsupported_dtls(self):
884 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
885 self.addCleanup(s.close)
886 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200887 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100888 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100890 with self.assertRaises(NotImplementedError) as cx:
891 ctx.wrap_socket(s)
892 self.assertEqual(str(cx.exception), "only stream sockets are supported")
893
Antoine Pitrouc695c952014-04-28 20:57:36 +0200894 def cert_time_ok(self, timestring, timestamp):
895 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
896
897 def cert_time_fail(self, timestring):
898 with self.assertRaises(ValueError):
899 ssl.cert_time_to_seconds(timestring)
900
901 @unittest.skipUnless(utc_offset(),
902 'local time needs to be different from UTC')
903 def test_cert_time_to_seconds_timezone(self):
904 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
905 # results if local timezone is not UTC
906 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
907 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
908
909 def test_cert_time_to_seconds(self):
910 timestring = "Jan 5 09:34:43 2018 GMT"
911 ts = 1515144883.0
912 self.cert_time_ok(timestring, ts)
913 # accept keyword parameter, assert its name
914 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
915 # accept both %e and %d (space or zero generated by strftime)
916 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
917 # case-insensitive
918 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
919 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
920 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
921 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
922 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
923 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
924 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
925 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
926
927 newyear_ts = 1230768000.0
928 # leap seconds
929 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
930 # same timestamp
931 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
932
933 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
934 # allow 60th second (even if it is not a leap second)
935 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
936 # allow 2nd leap second for compatibility with time.strptime()
937 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
938 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
939
Mike53f7a7c2017-12-14 14:04:53 +0300940 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200941 # 99991231235959Z (rfc 5280)
942 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
943
944 @support.run_with_locale('LC_ALL', '')
945 def test_cert_time_to_seconds_locale(self):
946 # `cert_time_to_seconds()` should be locale independent
947
948 def local_february_name():
949 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
950
951 if local_february_name().lower() == 'feb':
952 self.skipTest("locale-specific month name needs to be "
953 "different from C locale")
954
955 # locale-independent
956 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
957 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
958
Martin Panter3840b2a2016-03-27 01:53:46 +0000959 def test_connect_ex_error(self):
960 server = socket.socket(socket.AF_INET)
961 self.addCleanup(server.close)
962 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200963 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000964 cert_reqs=ssl.CERT_REQUIRED)
965 self.addCleanup(s.close)
966 rc = s.connect_ex((HOST, port))
967 # Issue #19919: Windows machines or VMs hosted on Windows
968 # machines sometimes return EWOULDBLOCK.
969 errors = (
970 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
971 errno.EWOULDBLOCK,
972 )
973 self.assertIn(rc, errors)
974
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100975
Antoine Pitrou152efa22010-05-16 18:19:27 +0000976class ContextTests(unittest.TestCase):
977
Antoine Pitrou23df4832010-08-04 17:14:06 +0000978 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000979 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100980 for protocol in PROTOCOLS:
981 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200982 ctx = ssl.SSLContext()
983 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000984 self.assertRaises(ValueError, ssl.SSLContext, -1)
985 self.assertRaises(ValueError, ssl.SSLContext, 42)
986
Antoine Pitrou23df4832010-08-04 17:14:06 +0000987 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000988 def test_protocol(self):
989 for proto in PROTOCOLS:
990 ctx = ssl.SSLContext(proto)
991 self.assertEqual(ctx.protocol, proto)
992
993 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000995 ctx.set_ciphers("ALL")
996 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000997 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000998 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000999
Christian Heimes892d66e2018-01-29 14:10:18 +01001000 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1001 "Test applies only to Python default ciphers")
1002 def test_python_ciphers(self):
1003 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1004 ciphers = ctx.get_ciphers()
1005 for suite in ciphers:
1006 name = suite['name']
1007 self.assertNotIn("PSK", name)
1008 self.assertNotIn("SRP", name)
1009 self.assertNotIn("MD5", name)
1010 self.assertNotIn("RC4", name)
1011 self.assertNotIn("3DES", name)
1012
Christian Heimes25bfcd52016-09-06 00:04:45 +02001013 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1014 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001016 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001017 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001018 self.assertIn('AES256-GCM-SHA384', names)
1019 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001020
Antoine Pitrou23df4832010-08-04 17:14:06 +00001021 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001022 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001024 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001025 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001026 # SSLContext also enables these by default
1027 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001028 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1029 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001030 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001031 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001032 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001033 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001034 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1035 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001036 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001037 # Ubuntu has OP_NO_SSLv3 forced on by default
1038 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001039 else:
1040 with self.assertRaises(ValueError):
1041 ctx.options = 0
1042
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 def test_verify_mode_protocol(self):
1044 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001045 # Default value
1046 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1047 ctx.verify_mode = ssl.CERT_OPTIONAL
1048 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1049 ctx.verify_mode = ssl.CERT_REQUIRED
1050 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1051 ctx.verify_mode = ssl.CERT_NONE
1052 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1053 with self.assertRaises(TypeError):
1054 ctx.verify_mode = None
1055 with self.assertRaises(ValueError):
1056 ctx.verify_mode = 42
1057
Christian Heimesa170fa12017-09-15 20:27:30 +02001058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1059 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1060 self.assertFalse(ctx.check_hostname)
1061
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1063 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1064 self.assertTrue(ctx.check_hostname)
1065
Christian Heimes61d478c2018-01-27 15:51:38 +01001066 def test_hostname_checks_common_name(self):
1067 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1068 self.assertTrue(ctx.hostname_checks_common_name)
1069 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1070 ctx.hostname_checks_common_name = True
1071 self.assertTrue(ctx.hostname_checks_common_name)
1072 ctx.hostname_checks_common_name = False
1073 self.assertFalse(ctx.hostname_checks_common_name)
1074 ctx.hostname_checks_common_name = True
1075 self.assertTrue(ctx.hostname_checks_common_name)
1076 else:
1077 with self.assertRaises(AttributeError):
1078 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001079
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001080 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1081 "required OpenSSL 1.1.0g")
1082 def test_min_max_version(self):
1083 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1084 self.assertEqual(
1085 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1086 )
1087 self.assertEqual(
1088 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1089 )
1090
1091 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1092 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1093 self.assertEqual(
1094 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1095 )
1096 self.assertEqual(
1097 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1098 )
1099
1100 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1101 ctx.maximum_version = ssl.TLSVersion.TLSv1
1102 self.assertEqual(
1103 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1104 )
1105 self.assertEqual(
1106 ctx.maximum_version, ssl.TLSVersion.TLSv1
1107 )
1108
1109 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1110 self.assertEqual(
1111 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1112 )
1113
1114 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1115 self.assertIn(
1116 ctx.maximum_version,
1117 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1118 )
1119
1120 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1121 self.assertIn(
1122 ctx.minimum_version,
1123 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1124 )
1125
1126 with self.assertRaises(ValueError):
1127 ctx.minimum_version = 42
1128
1129 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1130
1131 self.assertEqual(
1132 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1133 )
1134 self.assertEqual(
1135 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1136 )
1137 with self.assertRaises(ValueError):
1138 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1139 with self.assertRaises(ValueError):
1140 ctx.maximum_version = ssl.TLSVersion.TLSv1
1141
1142
Christian Heimes2427b502013-11-23 11:24:32 +01001143 @unittest.skipUnless(have_verify_flags(),
1144 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001145 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001146 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001147 # default value
1148 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1149 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001150 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1151 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1152 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1153 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1154 ctx.verify_flags = ssl.VERIFY_DEFAULT
1155 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1156 # supports any value
1157 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1158 self.assertEqual(ctx.verify_flags,
1159 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1160 with self.assertRaises(TypeError):
1161 ctx.verify_flags = None
1162
Antoine Pitrou152efa22010-05-16 18:19:27 +00001163 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001165 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001166 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001167 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1168 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001170 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001171 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001172 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001173 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001174 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001175 ctx.load_cert_chain(EMPTYCERT)
1176 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001177 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001178 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1179 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1180 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001181 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001182 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001183 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001184 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001185 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001186 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1187 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001188 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001189 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001190 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001191 # Password protected key and cert
1192 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1193 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1194 ctx.load_cert_chain(CERTFILE_PROTECTED,
1195 password=bytearray(KEY_PASSWORD.encode()))
1196 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1197 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1198 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1199 bytearray(KEY_PASSWORD.encode()))
1200 with self.assertRaisesRegex(TypeError, "should be a string"):
1201 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1202 with self.assertRaises(ssl.SSLError):
1203 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1204 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1205 # openssl has a fixed limit on the password buffer.
1206 # PEM_BUFSIZE is generally set to 1kb.
1207 # Return a string larger than this.
1208 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1209 # Password callback
1210 def getpass_unicode():
1211 return KEY_PASSWORD
1212 def getpass_bytes():
1213 return KEY_PASSWORD.encode()
1214 def getpass_bytearray():
1215 return bytearray(KEY_PASSWORD.encode())
1216 def getpass_badpass():
1217 return "badpass"
1218 def getpass_huge():
1219 return b'a' * (1024 * 1024)
1220 def getpass_bad_type():
1221 return 9
1222 def getpass_exception():
1223 raise Exception('getpass error')
1224 class GetPassCallable:
1225 def __call__(self):
1226 return KEY_PASSWORD
1227 def getpass(self):
1228 return KEY_PASSWORD
1229 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1232 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1233 ctx.load_cert_chain(CERTFILE_PROTECTED,
1234 password=GetPassCallable().getpass)
1235 with self.assertRaises(ssl.SSLError):
1236 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1237 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1238 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1239 with self.assertRaisesRegex(TypeError, "must return a string"):
1240 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1241 with self.assertRaisesRegex(Exception, "getpass error"):
1242 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1243 # Make sure the password function isn't called if it isn't needed
1244 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001245
1246 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001247 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001248 ctx.load_verify_locations(CERTFILE)
1249 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1250 ctx.load_verify_locations(BYTES_CERTFILE)
1251 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1252 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001253 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001254 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001255 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001256 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001257 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001258 ctx.load_verify_locations(BADCERT)
1259 ctx.load_verify_locations(CERTFILE, CAPATH)
1260 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1261
Victor Stinner80f75e62011-01-29 11:31:20 +00001262 # Issue #10989: crash if the second argument type is invalid
1263 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1264
Christian Heimesefff7062013-11-21 03:35:02 +01001265 def test_load_verify_cadata(self):
1266 # test cadata
1267 with open(CAFILE_CACERT) as f:
1268 cacert_pem = f.read()
1269 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1270 with open(CAFILE_NEURONIO) as f:
1271 neuronio_pem = f.read()
1272 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1273
1274 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001275 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001276 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1277 ctx.load_verify_locations(cadata=cacert_pem)
1278 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1279 ctx.load_verify_locations(cadata=neuronio_pem)
1280 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1281 # cert already in hash table
1282 ctx.load_verify_locations(cadata=neuronio_pem)
1283 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1284
1285 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001286 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001287 combined = "\n".join((cacert_pem, neuronio_pem))
1288 ctx.load_verify_locations(cadata=combined)
1289 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1290
1291 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001292 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001293 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1294 neuronio_pem, "tail"]
1295 ctx.load_verify_locations(cadata="\n".join(combined))
1296 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1297
1298 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001299 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001300 ctx.load_verify_locations(cadata=cacert_der)
1301 ctx.load_verify_locations(cadata=neuronio_der)
1302 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1303 # cert already in hash table
1304 ctx.load_verify_locations(cadata=cacert_der)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1306
1307 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001308 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001309 combined = b"".join((cacert_der, neuronio_der))
1310 ctx.load_verify_locations(cadata=combined)
1311 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1312
1313 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001314 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001315 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1316
1317 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1318 ctx.load_verify_locations(cadata="broken")
1319 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1320 ctx.load_verify_locations(cadata=b"broken")
1321
1322
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001323 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001324 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001325 ctx.load_dh_params(DHFILE)
1326 if os.name != 'nt':
1327 ctx.load_dh_params(BYTES_DHFILE)
1328 self.assertRaises(TypeError, ctx.load_dh_params)
1329 self.assertRaises(TypeError, ctx.load_dh_params, None)
1330 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001331 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001332 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001333 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001334 ctx.load_dh_params(CERTFILE)
1335
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001336 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001337 def test_session_stats(self):
1338 for proto in PROTOCOLS:
1339 ctx = ssl.SSLContext(proto)
1340 self.assertEqual(ctx.session_stats(), {
1341 'number': 0,
1342 'connect': 0,
1343 'connect_good': 0,
1344 'connect_renegotiate': 0,
1345 'accept': 0,
1346 'accept_good': 0,
1347 'accept_renegotiate': 0,
1348 'hits': 0,
1349 'misses': 0,
1350 'timeouts': 0,
1351 'cache_full': 0,
1352 })
1353
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001354 def test_set_default_verify_paths(self):
1355 # There's not much we can do to test that it acts as expected,
1356 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001357 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001358 ctx.set_default_verify_paths()
1359
Antoine Pitrou501da612011-12-21 09:27:41 +01001360 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001361 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001362 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001363 ctx.set_ecdh_curve("prime256v1")
1364 ctx.set_ecdh_curve(b"prime256v1")
1365 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1366 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1367 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1368 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1369
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001370 @needs_sni
1371 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001372 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001373
1374 # set_servername_callback expects a callable, or None
1375 self.assertRaises(TypeError, ctx.set_servername_callback)
1376 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1377 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1378 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1379
1380 def dummycallback(sock, servername, ctx):
1381 pass
1382 ctx.set_servername_callback(None)
1383 ctx.set_servername_callback(dummycallback)
1384
1385 @needs_sni
1386 def test_sni_callback_refcycle(self):
1387 # Reference cycles through the servername callback are detected
1388 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001389 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001390 def dummycallback(sock, servername, ctx, cycle=ctx):
1391 pass
1392 ctx.set_servername_callback(dummycallback)
1393 wr = weakref.ref(ctx)
1394 del ctx, dummycallback
1395 gc.collect()
1396 self.assertIs(wr(), None)
1397
Christian Heimes9a5395a2013-06-17 15:44:12 +02001398 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001399 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001400 self.assertEqual(ctx.cert_store_stats(),
1401 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1402 ctx.load_cert_chain(CERTFILE)
1403 self.assertEqual(ctx.cert_store_stats(),
1404 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1405 ctx.load_verify_locations(CERTFILE)
1406 self.assertEqual(ctx.cert_store_stats(),
1407 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001408 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001409 self.assertEqual(ctx.cert_store_stats(),
1410 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1411
1412 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001413 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001414 self.assertEqual(ctx.get_ca_certs(), [])
1415 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1416 ctx.load_verify_locations(CERTFILE)
1417 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001418 # but CAFILE_CACERT is a CA cert
1419 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001420 self.assertEqual(ctx.get_ca_certs(),
1421 [{'issuer': ((('organizationName', 'Root CA'),),
1422 (('organizationalUnitName', 'http://www.cacert.org'),),
1423 (('commonName', 'CA Cert Signing Authority'),),
1424 (('emailAddress', 'support@cacert.org'),)),
1425 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1426 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1427 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001428 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001429 'subject': ((('organizationName', 'Root CA'),),
1430 (('organizationalUnitName', 'http://www.cacert.org'),),
1431 (('commonName', 'CA Cert Signing Authority'),),
1432 (('emailAddress', 'support@cacert.org'),)),
1433 'version': 3}])
1434
Martin Panterb55f8b72016-01-14 12:53:56 +00001435 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001436 pem = f.read()
1437 der = ssl.PEM_cert_to_DER_cert(pem)
1438 self.assertEqual(ctx.get_ca_certs(True), [der])
1439
Christian Heimes72d28502013-11-23 13:56:58 +01001440 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001441 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001442 ctx.load_default_certs()
1443
Christian Heimesa170fa12017-09-15 20:27:30 +02001444 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001445 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1446 ctx.load_default_certs()
1447
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001449 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1450
Christian Heimesa170fa12017-09-15 20:27:30 +02001451 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001452 self.assertRaises(TypeError, ctx.load_default_certs, None)
1453 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1454
Benjamin Peterson91244e02014-10-03 18:17:15 -04001455 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001456 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001457 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001458 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001459 with support.EnvironmentVarGuard() as env:
1460 env["SSL_CERT_DIR"] = CAPATH
1461 env["SSL_CERT_FILE"] = CERTFILE
1462 ctx.load_default_certs()
1463 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1464
Benjamin Peterson91244e02014-10-03 18:17:15 -04001465 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001466 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001467 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001469 ctx.load_default_certs()
1470 stats = ctx.cert_store_stats()
1471
Christian Heimesa170fa12017-09-15 20:27:30 +02001472 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001473 with support.EnvironmentVarGuard() as env:
1474 env["SSL_CERT_DIR"] = CAPATH
1475 env["SSL_CERT_FILE"] = CERTFILE
1476 ctx.load_default_certs()
1477 stats["x509"] += 1
1478 self.assertEqual(ctx.cert_store_stats(), stats)
1479
Christian Heimes358cfd42016-09-10 22:43:48 +02001480 def _assert_context_options(self, ctx):
1481 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1482 if OP_NO_COMPRESSION != 0:
1483 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1484 OP_NO_COMPRESSION)
1485 if OP_SINGLE_DH_USE != 0:
1486 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1487 OP_SINGLE_DH_USE)
1488 if OP_SINGLE_ECDH_USE != 0:
1489 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1490 OP_SINGLE_ECDH_USE)
1491 if OP_CIPHER_SERVER_PREFERENCE != 0:
1492 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1493 OP_CIPHER_SERVER_PREFERENCE)
1494
Christian Heimes4c05b472013-11-23 15:58:30 +01001495 def test_create_default_context(self):
1496 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001497
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001499 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001500 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001501 self._assert_context_options(ctx)
1502
Christian Heimes4c05b472013-11-23 15:58:30 +01001503 with open(SIGNING_CA) as f:
1504 cadata = f.read()
1505 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1506 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001507 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001508 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001509 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001510
1511 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001512 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001513 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001515
Christian Heimes67986f92013-11-23 22:43:47 +01001516 def test__create_stdlib_context(self):
1517 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001518 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001519 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001520 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001521 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001522
1523 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1524 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1525 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001526 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001527
1528 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001529 cert_reqs=ssl.CERT_REQUIRED,
1530 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001531 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1532 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001533 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001534 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001535
1536 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001537 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001538 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001539 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001540
Christian Heimes1aa9a752013-12-02 02:41:19 +01001541 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001543 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001544 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001545
Christian Heimese82c0342017-09-15 20:29:57 +02001546 # Auto set CERT_REQUIRED
1547 ctx.check_hostname = True
1548 self.assertTrue(ctx.check_hostname)
1549 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1550 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001551 ctx.verify_mode = ssl.CERT_REQUIRED
1552 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001553 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001554
Christian Heimese82c0342017-09-15 20:29:57 +02001555 # Changing verify_mode does not affect check_hostname
1556 ctx.check_hostname = False
1557 ctx.verify_mode = ssl.CERT_NONE
1558 ctx.check_hostname = False
1559 self.assertFalse(ctx.check_hostname)
1560 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1561 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001562 ctx.check_hostname = True
1563 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001564 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1565
1566 ctx.check_hostname = False
1567 ctx.verify_mode = ssl.CERT_OPTIONAL
1568 ctx.check_hostname = False
1569 self.assertFalse(ctx.check_hostname)
1570 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1571 # keep CERT_OPTIONAL
1572 ctx.check_hostname = True
1573 self.assertTrue(ctx.check_hostname)
1574 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575
1576 # Cannot set CERT_NONE with check_hostname enabled
1577 with self.assertRaises(ValueError):
1578 ctx.verify_mode = ssl.CERT_NONE
1579 ctx.check_hostname = False
1580 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001581 ctx.verify_mode = ssl.CERT_NONE
1582 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001583
Christian Heimes5fe668c2016-09-12 00:01:11 +02001584 def test_context_client_server(self):
1585 # PROTOCOL_TLS_CLIENT has sane defaults
1586 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1587 self.assertTrue(ctx.check_hostname)
1588 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1589
1590 # PROTOCOL_TLS_SERVER has different but also sane defaults
1591 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1592 self.assertFalse(ctx.check_hostname)
1593 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1594
Christian Heimes4df60f12017-09-15 20:26:05 +02001595 def test_context_custom_class(self):
1596 class MySSLSocket(ssl.SSLSocket):
1597 pass
1598
1599 class MySSLObject(ssl.SSLObject):
1600 pass
1601
1602 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1603 ctx.sslsocket_class = MySSLSocket
1604 ctx.sslobject_class = MySSLObject
1605
1606 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1607 self.assertIsInstance(sock, MySSLSocket)
1608 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1609 self.assertIsInstance(obj, MySSLObject)
1610
Antoine Pitrou152efa22010-05-16 18:19:27 +00001611
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001612class SSLErrorTests(unittest.TestCase):
1613
1614 def test_str(self):
1615 # The str() of a SSLError doesn't include the errno
1616 e = ssl.SSLError(1, "foo")
1617 self.assertEqual(str(e), "foo")
1618 self.assertEqual(e.errno, 1)
1619 # Same for a subclass
1620 e = ssl.SSLZeroReturnError(1, "foo")
1621 self.assertEqual(str(e), "foo")
1622 self.assertEqual(e.errno, 1)
1623
1624 def test_lib_reason(self):
1625 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001626 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001627 with self.assertRaises(ssl.SSLError) as cm:
1628 ctx.load_dh_params(CERTFILE)
1629 self.assertEqual(cm.exception.library, 'PEM')
1630 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1631 s = str(cm.exception)
1632 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1633
1634 def test_subclass(self):
1635 # Check that the appropriate SSLError subclass is raised
1636 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001637 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1638 ctx.check_hostname = False
1639 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001640 with socket.socket() as s:
1641 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001642 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001643 c = socket.socket()
1644 c.connect(s.getsockname())
1645 c.setblocking(False)
1646 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001647 with self.assertRaises(ssl.SSLWantReadError) as cm:
1648 c.do_handshake()
1649 s = str(cm.exception)
1650 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1651 # For compatibility
1652 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1653
1654
Christian Heimes61d478c2018-01-27 15:51:38 +01001655 def test_bad_server_hostname(self):
1656 ctx = ssl.create_default_context()
1657 with self.assertRaises(ValueError):
1658 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1659 server_hostname="")
1660 with self.assertRaises(ValueError):
1661 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1662 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001663 with self.assertRaises(TypeError):
1664 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1665 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001666
1667
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001668class MemoryBIOTests(unittest.TestCase):
1669
1670 def test_read_write(self):
1671 bio = ssl.MemoryBIO()
1672 bio.write(b'foo')
1673 self.assertEqual(bio.read(), b'foo')
1674 self.assertEqual(bio.read(), b'')
1675 bio.write(b'foo')
1676 bio.write(b'bar')
1677 self.assertEqual(bio.read(), b'foobar')
1678 self.assertEqual(bio.read(), b'')
1679 bio.write(b'baz')
1680 self.assertEqual(bio.read(2), b'ba')
1681 self.assertEqual(bio.read(1), b'z')
1682 self.assertEqual(bio.read(1), b'')
1683
1684 def test_eof(self):
1685 bio = ssl.MemoryBIO()
1686 self.assertFalse(bio.eof)
1687 self.assertEqual(bio.read(), b'')
1688 self.assertFalse(bio.eof)
1689 bio.write(b'foo')
1690 self.assertFalse(bio.eof)
1691 bio.write_eof()
1692 self.assertFalse(bio.eof)
1693 self.assertEqual(bio.read(2), b'fo')
1694 self.assertFalse(bio.eof)
1695 self.assertEqual(bio.read(1), b'o')
1696 self.assertTrue(bio.eof)
1697 self.assertEqual(bio.read(), b'')
1698 self.assertTrue(bio.eof)
1699
1700 def test_pending(self):
1701 bio = ssl.MemoryBIO()
1702 self.assertEqual(bio.pending, 0)
1703 bio.write(b'foo')
1704 self.assertEqual(bio.pending, 3)
1705 for i in range(3):
1706 bio.read(1)
1707 self.assertEqual(bio.pending, 3-i-1)
1708 for i in range(3):
1709 bio.write(b'x')
1710 self.assertEqual(bio.pending, i+1)
1711 bio.read()
1712 self.assertEqual(bio.pending, 0)
1713
1714 def test_buffer_types(self):
1715 bio = ssl.MemoryBIO()
1716 bio.write(b'foo')
1717 self.assertEqual(bio.read(), b'foo')
1718 bio.write(bytearray(b'bar'))
1719 self.assertEqual(bio.read(), b'bar')
1720 bio.write(memoryview(b'baz'))
1721 self.assertEqual(bio.read(), b'baz')
1722
1723 def test_error_types(self):
1724 bio = ssl.MemoryBIO()
1725 self.assertRaises(TypeError, bio.write, 'foo')
1726 self.assertRaises(TypeError, bio.write, None)
1727 self.assertRaises(TypeError, bio.write, True)
1728 self.assertRaises(TypeError, bio.write, 1)
1729
1730
Christian Heimes89c20512018-02-27 11:17:32 +01001731class SSLObjectTests(unittest.TestCase):
1732 def test_private_init(self):
1733 bio = ssl.MemoryBIO()
1734 with self.assertRaisesRegex(TypeError, "public constructor"):
1735 ssl.SSLObject(bio, bio)
1736
1737
Martin Panter3840b2a2016-03-27 01:53:46 +00001738class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001739 """Tests that connect to a simple server running in the background"""
1740
1741 def setUp(self):
1742 server = ThreadedEchoServer(SIGNED_CERTFILE)
1743 self.server_addr = (HOST, server.port)
1744 server.__enter__()
1745 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001746
Antoine Pitrou480a1242010-04-28 21:37:09 +00001747 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001748 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001749 cert_reqs=ssl.CERT_NONE) as s:
1750 s.connect(self.server_addr)
1751 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001752 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001753
Martin Panter3840b2a2016-03-27 01:53:46 +00001754 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001755 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001756 cert_reqs=ssl.CERT_REQUIRED,
1757 ca_certs=SIGNING_CA) as s:
1758 s.connect(self.server_addr)
1759 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001760 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001761
Martin Panter3840b2a2016-03-27 01:53:46 +00001762 def test_connect_fail(self):
1763 # This should fail because we have no verification certs. Connection
1764 # failure crashes ThreadedEchoServer, so run this in an independent
1765 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001766 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001767 cert_reqs=ssl.CERT_REQUIRED)
1768 self.addCleanup(s.close)
1769 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1770 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001771
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001772 def test_connect_ex(self):
1773 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001774 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001775 cert_reqs=ssl.CERT_REQUIRED,
1776 ca_certs=SIGNING_CA)
1777 self.addCleanup(s.close)
1778 self.assertEqual(0, s.connect_ex(self.server_addr))
1779 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001780
1781 def test_non_blocking_connect_ex(self):
1782 # Issue #11326: non-blocking connect_ex() should allow handshake
1783 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001784 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001785 cert_reqs=ssl.CERT_REQUIRED,
1786 ca_certs=SIGNING_CA,
1787 do_handshake_on_connect=False)
1788 self.addCleanup(s.close)
1789 s.setblocking(False)
1790 rc = s.connect_ex(self.server_addr)
1791 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1792 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1793 # Wait for connect to finish
1794 select.select([], [s], [], 5.0)
1795 # Non-blocking handshake
1796 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001797 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001798 s.do_handshake()
1799 break
1800 except ssl.SSLWantReadError:
1801 select.select([s], [], [], 5.0)
1802 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001803 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001804 # SSL established
1805 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001806
Antoine Pitrou152efa22010-05-16 18:19:27 +00001807 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001808 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001809 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001810 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1811 s.connect(self.server_addr)
1812 self.assertEqual({}, s.getpeercert())
1813 # Same with a server hostname
1814 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1815 server_hostname="dummy") as s:
1816 s.connect(self.server_addr)
1817 ctx.verify_mode = ssl.CERT_REQUIRED
1818 # This should succeed because we specify the root cert
1819 ctx.load_verify_locations(SIGNING_CA)
1820 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1821 s.connect(self.server_addr)
1822 cert = s.getpeercert()
1823 self.assertTrue(cert)
1824
1825 def test_connect_with_context_fail(self):
1826 # This should fail because we have no verification certs. Connection
1827 # failure crashes ThreadedEchoServer, so run this in an independent
1828 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001829 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001830 ctx.verify_mode = ssl.CERT_REQUIRED
1831 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1832 self.addCleanup(s.close)
1833 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1834 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001835
1836 def test_connect_capath(self):
1837 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001838 # NOTE: the subject hashing algorithm has been changed between
1839 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1840 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001841 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001842 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 ctx.verify_mode = ssl.CERT_REQUIRED
1844 ctx.load_verify_locations(capath=CAPATH)
1845 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1846 s.connect(self.server_addr)
1847 cert = s.getpeercert()
1848 self.assertTrue(cert)
1849 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001850 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001851 ctx.verify_mode = ssl.CERT_REQUIRED
1852 ctx.load_verify_locations(capath=BYTES_CAPATH)
1853 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1854 s.connect(self.server_addr)
1855 cert = s.getpeercert()
1856 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001857
Christian Heimesefff7062013-11-21 03:35:02 +01001858 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001859 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001860 pem = f.read()
1861 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001862 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001863 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001864 # TODO: fix TLSv1.3 support
1865 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001866 ctx.load_verify_locations(cadata=pem)
1867 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1868 s.connect(self.server_addr)
1869 cert = s.getpeercert()
1870 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001871
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001873 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001874 ctx.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001875 # TODO: fix TLSv1.3 support
1876 ctx.options |= ssl.OP_NO_TLSv1_3
Martin Panter3840b2a2016-03-27 01:53:46 +00001877 ctx.load_verify_locations(cadata=der)
1878 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1879 s.connect(self.server_addr)
1880 cert = s.getpeercert()
1881 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001882
Antoine Pitroue3220242010-04-24 11:13:53 +00001883 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1884 def test_makefile_close(self):
1885 # Issue #5238: creating a file-like object with makefile() shouldn't
1886 # delay closing the underlying "real socket" (here tested with its
1887 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001888 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001889 ss.connect(self.server_addr)
1890 fd = ss.fileno()
1891 f = ss.makefile()
1892 f.close()
1893 # The fd is still open
1894 os.read(fd, 0)
1895 # Closing the SSL socket should close the fd too
1896 ss.close()
1897 gc.collect()
1898 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001899 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001900 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001901
Antoine Pitrou480a1242010-04-28 21:37:09 +00001902 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001903 s = socket.socket(socket.AF_INET)
1904 s.connect(self.server_addr)
1905 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001906 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001907 cert_reqs=ssl.CERT_NONE,
1908 do_handshake_on_connect=False)
1909 self.addCleanup(s.close)
1910 count = 0
1911 while True:
1912 try:
1913 count += 1
1914 s.do_handshake()
1915 break
1916 except ssl.SSLWantReadError:
1917 select.select([s], [], [])
1918 except ssl.SSLWantWriteError:
1919 select.select([], [s], [])
1920 if support.verbose:
1921 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001922
Antoine Pitrou480a1242010-04-28 21:37:09 +00001923 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001925
Martin Panter3840b2a2016-03-27 01:53:46 +00001926 def test_get_server_certificate_fail(self):
1927 # Connection failure crashes ThreadedEchoServer, so run this in an
1928 # independent test method
1929 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001930
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001931 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001932 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001933 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1934 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001935 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001936 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1937 s.connect(self.server_addr)
1938 # Error checking can happen at instantiation or when connecting
1939 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1940 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001941 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1943 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001944
Christian Heimes9a5395a2013-06-17 15:44:12 +02001945 def test_get_ca_certs_capath(self):
1946 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001947 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001948 ctx.load_verify_locations(capath=CAPATH)
1949 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001950 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1951 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001952 s.connect(self.server_addr)
1953 cert = s.getpeercert()
1954 self.assertTrue(cert)
1955 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001956
Christian Heimes575596e2013-12-15 21:49:17 +01001957 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001958 def test_context_setget(self):
1959 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001960 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1961 ctx1.load_verify_locations(capath=CAPATH)
1962 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1963 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001964 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001965 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001966 ss.connect(self.server_addr)
1967 self.assertIs(ss.context, ctx1)
1968 self.assertIs(ss._sslobj.context, ctx1)
1969 ss.context = ctx2
1970 self.assertIs(ss.context, ctx2)
1971 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001972
1973 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1974 # A simple IO loop. Call func(*args) depending on the error we get
1975 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1976 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001977 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001978 count = 0
1979 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001980 if time.monotonic() > deadline:
1981 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001982 errno = None
1983 count += 1
1984 try:
1985 ret = func(*args)
1986 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001987 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001988 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001989 raise
1990 errno = e.errno
1991 # Get any data from the outgoing BIO irrespective of any error, and
1992 # send it to the socket.
1993 buf = outgoing.read()
1994 sock.sendall(buf)
1995 # If there's no error, we're done. For WANT_READ, we need to get
1996 # data from the socket and put it in the incoming BIO.
1997 if errno is None:
1998 break
1999 elif errno == ssl.SSL_ERROR_WANT_READ:
2000 buf = sock.recv(32768)
2001 if buf:
2002 incoming.write(buf)
2003 else:
2004 incoming.write_eof()
2005 if support.verbose:
2006 sys.stdout.write("Needed %d calls to complete %s().\n"
2007 % (count, func.__name__))
2008 return ret
2009
Martin Panter3840b2a2016-03-27 01:53:46 +00002010 def test_bio_handshake(self):
2011 sock = socket.socket(socket.AF_INET)
2012 self.addCleanup(sock.close)
2013 sock.connect(self.server_addr)
2014 incoming = ssl.MemoryBIO()
2015 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002016 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2017 self.assertTrue(ctx.check_hostname)
2018 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002019 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002020 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2021 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002022 self.assertIs(sslobj._sslobj.owner, sslobj)
2023 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002024 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002025 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002026 self.assertRaises(ValueError, sslobj.getpeercert)
2027 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2028 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2029 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2030 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002031 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002032 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002033 self.assertTrue(sslobj.getpeercert())
2034 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2035 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2036 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002037 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002038 except ssl.SSLSyscallError:
2039 # If the server shuts down the TCP connection without sending a
2040 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2041 pass
2042 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2043
2044 def test_bio_read_write_data(self):
2045 sock = socket.socket(socket.AF_INET)
2046 self.addCleanup(sock.close)
2047 sock.connect(self.server_addr)
2048 incoming = ssl.MemoryBIO()
2049 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002051 ctx.verify_mode = ssl.CERT_NONE
2052 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2053 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2054 req = b'FOO\n'
2055 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2056 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2057 self.assertEqual(buf, b'foo\n')
2058 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002059
2060
Martin Panter3840b2a2016-03-27 01:53:46 +00002061class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002062
Martin Panter3840b2a2016-03-27 01:53:46 +00002063 def test_timeout_connect_ex(self):
2064 # Issue #12065: on a timeout, connect_ex() should return the original
2065 # errno (mimicking the behaviour of non-SSL sockets).
2066 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002067 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002068 cert_reqs=ssl.CERT_REQUIRED,
2069 do_handshake_on_connect=False)
2070 self.addCleanup(s.close)
2071 s.settimeout(0.0000001)
2072 rc = s.connect_ex((REMOTE_HOST, 443))
2073 if rc == 0:
2074 self.skipTest("REMOTE_HOST responded too quickly")
2075 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2076
2077 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2078 def test_get_server_certificate_ipv6(self):
2079 with support.transient_internet('ipv6.google.com'):
2080 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2081 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2082
Martin Panter3840b2a2016-03-27 01:53:46 +00002083
2084def _test_get_server_certificate(test, host, port, cert=None):
2085 pem = ssl.get_server_certificate((host, port))
2086 if not pem:
2087 test.fail("No server certificate on %s:%s!" % (host, port))
2088
2089 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2090 if not pem:
2091 test.fail("No server certificate on %s:%s!" % (host, port))
2092 if support.verbose:
2093 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2094
2095def _test_get_server_certificate_fail(test, host, port):
2096 try:
2097 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2098 except ssl.SSLError as x:
2099 #should fail
2100 if support.verbose:
2101 sys.stdout.write("%s\n" % x)
2102 else:
2103 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2104
2105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002106from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002107
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002108class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002109
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002110 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002111
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002112 """A mildly complicated class, because we want it to work both
2113 with and without the SSL wrapper around the socket connection, so
2114 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002115
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002116 def __init__(self, server, connsock, addr):
2117 self.server = server
2118 self.running = False
2119 self.sock = connsock
2120 self.addr = addr
2121 self.sock.setblocking(1)
2122 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002123 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002124 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002125
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002126 def wrap_conn(self):
2127 try:
2128 self.sslconn = self.server.context.wrap_socket(
2129 self.sock, server_side=True)
2130 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2131 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2132 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2133 # We treat ConnectionResetError as though it were an
2134 # SSLError - OpenSSL on Ubuntu abruptly closes the
2135 # connection when asked to use an unsupported protocol.
2136 #
2137 # OSError may occur with wrong protocols, e.g. both
2138 # sides use PROTOCOL_TLS_SERVER.
2139 #
2140 # XXX Various errors can have happened here, for example
2141 # a mismatching protocol version, an invalid certificate,
2142 # or a low-level bug. This should be made more discriminating.
2143 #
2144 # bpo-31323: Store the exception as string to prevent
2145 # a reference leak: server -> conn_errors -> exception
2146 # -> traceback -> self (ConnectionHandler) -> server
2147 self.server.conn_errors.append(str(e))
2148 if self.server.chatty:
2149 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2150 self.running = False
2151 self.server.stop()
2152 self.close()
2153 return False
2154 else:
2155 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2156 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2157 cert = self.sslconn.getpeercert()
2158 if support.verbose and self.server.chatty:
2159 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2160 cert_binary = self.sslconn.getpeercert(True)
2161 if support.verbose and self.server.chatty:
2162 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2163 cipher = self.sslconn.cipher()
2164 if support.verbose and self.server.chatty:
2165 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2166 sys.stdout.write(" server: selected protocol is now "
2167 + str(self.sslconn.selected_npn_protocol()) + "\n")
2168 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002169
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002170 def read(self):
2171 if self.sslconn:
2172 return self.sslconn.read()
2173 else:
2174 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002175
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002176 def write(self, bytes):
2177 if self.sslconn:
2178 return self.sslconn.write(bytes)
2179 else:
2180 return self.sock.send(bytes)
2181
2182 def close(self):
2183 if self.sslconn:
2184 self.sslconn.close()
2185 else:
2186 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002187
Antoine Pitrou480a1242010-04-28 21:37:09 +00002188 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002189 self.running = True
2190 if not self.server.starttls_server:
2191 if not self.wrap_conn():
2192 return
2193 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002194 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002195 msg = self.read()
2196 stripped = msg.strip()
2197 if not stripped:
2198 # eof, so quit this handler
2199 self.running = False
2200 try:
2201 self.sock = self.sslconn.unwrap()
2202 except OSError:
2203 # Many tests shut the TCP connection down
2204 # without an SSL shutdown. This causes
2205 # unwrap() to raise OSError with errno=0!
2206 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002207 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002208 self.sslconn = None
2209 self.close()
2210 elif stripped == b'over':
2211 if support.verbose and self.server.connectionchatty:
2212 sys.stdout.write(" server: client closed connection\n")
2213 self.close()
2214 return
2215 elif (self.server.starttls_server and
2216 stripped == b'STARTTLS'):
2217 if support.verbose and self.server.connectionchatty:
2218 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2219 self.write(b"OK\n")
2220 if not self.wrap_conn():
2221 return
2222 elif (self.server.starttls_server and self.sslconn
2223 and stripped == b'ENDTLS'):
2224 if support.verbose and self.server.connectionchatty:
2225 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2226 self.write(b"OK\n")
2227 self.sock = self.sslconn.unwrap()
2228 self.sslconn = None
2229 if support.verbose and self.server.connectionchatty:
2230 sys.stdout.write(" server: connection is now unencrypted...\n")
2231 elif stripped == b'CB tls-unique':
2232 if support.verbose and self.server.connectionchatty:
2233 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2234 data = self.sslconn.get_channel_binding("tls-unique")
2235 self.write(repr(data).encode("us-ascii") + b"\n")
2236 else:
2237 if (support.verbose and
2238 self.server.connectionchatty):
2239 ctype = (self.sslconn and "encrypted") or "unencrypted"
2240 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2241 % (msg, ctype, msg.lower(), ctype))
2242 self.write(msg.lower())
2243 except OSError:
2244 if self.server.chatty:
2245 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002246 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002247 self.running = False
2248 # normally, we'd just stop here, but for the test
2249 # harness, we want to stop the server
2250 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002251
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002252 def __init__(self, certificate=None, ssl_version=None,
2253 certreqs=None, cacerts=None,
2254 chatty=True, connectionchatty=False, starttls_server=False,
2255 npn_protocols=None, alpn_protocols=None,
2256 ciphers=None, context=None):
2257 if context:
2258 self.context = context
2259 else:
2260 self.context = ssl.SSLContext(ssl_version
2261 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002262 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002263 self.context.verify_mode = (certreqs if certreqs is not None
2264 else ssl.CERT_NONE)
2265 if cacerts:
2266 self.context.load_verify_locations(cacerts)
2267 if certificate:
2268 self.context.load_cert_chain(certificate)
2269 if npn_protocols:
2270 self.context.set_npn_protocols(npn_protocols)
2271 if alpn_protocols:
2272 self.context.set_alpn_protocols(alpn_protocols)
2273 if ciphers:
2274 self.context.set_ciphers(ciphers)
2275 self.chatty = chatty
2276 self.connectionchatty = connectionchatty
2277 self.starttls_server = starttls_server
2278 self.sock = socket.socket()
2279 self.port = support.bind_port(self.sock)
2280 self.flag = None
2281 self.active = False
2282 self.selected_npn_protocols = []
2283 self.selected_alpn_protocols = []
2284 self.shared_ciphers = []
2285 self.conn_errors = []
2286 threading.Thread.__init__(self)
2287 self.daemon = True
2288
2289 def __enter__(self):
2290 self.start(threading.Event())
2291 self.flag.wait()
2292 return self
2293
2294 def __exit__(self, *args):
2295 self.stop()
2296 self.join()
2297
2298 def start(self, flag=None):
2299 self.flag = flag
2300 threading.Thread.start(self)
2301
2302 def run(self):
2303 self.sock.settimeout(0.05)
2304 self.sock.listen()
2305 self.active = True
2306 if self.flag:
2307 # signal an event
2308 self.flag.set()
2309 while self.active:
2310 try:
2311 newconn, connaddr = self.sock.accept()
2312 if support.verbose and self.chatty:
2313 sys.stdout.write(' server: new connection from '
2314 + repr(connaddr) + '\n')
2315 handler = self.ConnectionHandler(self, newconn, connaddr)
2316 handler.start()
2317 handler.join()
2318 except socket.timeout:
2319 pass
2320 except KeyboardInterrupt:
2321 self.stop()
2322 self.sock.close()
2323
2324 def stop(self):
2325 self.active = False
2326
2327class AsyncoreEchoServer(threading.Thread):
2328
2329 # this one's based on asyncore.dispatcher
2330
2331 class EchoServer (asyncore.dispatcher):
2332
2333 class ConnectionHandler(asyncore.dispatcher_with_send):
2334
2335 def __init__(self, conn, certfile):
2336 self.socket = test_wrap_socket(conn, server_side=True,
2337 certfile=certfile,
2338 do_handshake_on_connect=False)
2339 asyncore.dispatcher_with_send.__init__(self, self.socket)
2340 self._ssl_accepting = True
2341 self._do_ssl_handshake()
2342
2343 def readable(self):
2344 if isinstance(self.socket, ssl.SSLSocket):
2345 while self.socket.pending() > 0:
2346 self.handle_read_event()
2347 return True
2348
2349 def _do_ssl_handshake(self):
2350 try:
2351 self.socket.do_handshake()
2352 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2353 return
2354 except ssl.SSLEOFError:
2355 return self.handle_close()
2356 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002357 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002358 except OSError as err:
2359 if err.args[0] == errno.ECONNABORTED:
2360 return self.handle_close()
2361 else:
2362 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002363
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002364 def handle_read(self):
2365 if self._ssl_accepting:
2366 self._do_ssl_handshake()
2367 else:
2368 data = self.recv(1024)
2369 if support.verbose:
2370 sys.stdout.write(" server: read %s from client\n" % repr(data))
2371 if not data:
2372 self.close()
2373 else:
2374 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002375
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002376 def handle_close(self):
2377 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002378 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002379 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002380
2381 def handle_error(self):
2382 raise
2383
Trent Nelson78520002008-04-10 20:54:35 +00002384 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002385 self.certfile = certfile
2386 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2387 self.port = support.bind_port(sock, '')
2388 asyncore.dispatcher.__init__(self, sock)
2389 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002390
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002391 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002392 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002393 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2394 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002395
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002396 def handle_error(self):
2397 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002398
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002399 def __init__(self, certfile):
2400 self.flag = None
2401 self.active = False
2402 self.server = self.EchoServer(certfile)
2403 self.port = self.server.port
2404 threading.Thread.__init__(self)
2405 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002406
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002407 def __str__(self):
2408 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002409
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002410 def __enter__(self):
2411 self.start(threading.Event())
2412 self.flag.wait()
2413 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002414
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002415 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002416 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002417 sys.stdout.write(" cleanup: stopping server.\n")
2418 self.stop()
2419 if support.verbose:
2420 sys.stdout.write(" cleanup: joining server thread.\n")
2421 self.join()
2422 if support.verbose:
2423 sys.stdout.write(" cleanup: successfully joined.\n")
2424 # make sure that ConnectionHandler is removed from socket_map
2425 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002426
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002427 def start (self, flag=None):
2428 self.flag = flag
2429 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002430
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002431 def run(self):
2432 self.active = True
2433 if self.flag:
2434 self.flag.set()
2435 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002436 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002437 asyncore.loop(1)
2438 except:
2439 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002440
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 def stop(self):
2442 self.active = False
2443 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002444
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002445def server_params_test(client_context, server_context, indata=b"FOO\n",
2446 chatty=True, connectionchatty=False, sni_name=None,
2447 session=None):
2448 """
2449 Launch a server, connect a client to it and try various reads
2450 and writes.
2451 """
2452 stats = {}
2453 server = ThreadedEchoServer(context=server_context,
2454 chatty=chatty,
2455 connectionchatty=False)
2456 with server:
2457 with client_context.wrap_socket(socket.socket(),
2458 server_hostname=sni_name, session=session) as s:
2459 s.connect((HOST, server.port))
2460 for arg in [indata, bytearray(indata), memoryview(indata)]:
2461 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002462 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002463 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002464 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002465 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002466 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002467 if connectionchatty:
2468 if support.verbose:
2469 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002470 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002472 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2473 % (outdata[:20], len(outdata),
2474 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 s.write(b"over\n")
2476 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002477 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002478 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002479 stats.update({
2480 'compression': s.compression(),
2481 'cipher': s.cipher(),
2482 'peercert': s.getpeercert(),
2483 'client_alpn_protocol': s.selected_alpn_protocol(),
2484 'client_npn_protocol': s.selected_npn_protocol(),
2485 'version': s.version(),
2486 'session_reused': s.session_reused,
2487 'session': s.session,
2488 })
2489 s.close()
2490 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2491 stats['server_npn_protocols'] = server.selected_npn_protocols
2492 stats['server_shared_ciphers'] = server.shared_ciphers
2493 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002494
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002495def try_protocol_combo(server_protocol, client_protocol, expect_success,
2496 certsreqs=None, server_options=0, client_options=0):
2497 """
2498 Try to SSL-connect using *client_protocol* to *server_protocol*.
2499 If *expect_success* is true, assert that the connection succeeds,
2500 if it's false, assert that the connection fails.
2501 Also, if *expect_success* is a string, assert that it is the protocol
2502 version actually used by the connection.
2503 """
2504 if certsreqs is None:
2505 certsreqs = ssl.CERT_NONE
2506 certtype = {
2507 ssl.CERT_NONE: "CERT_NONE",
2508 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2509 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2510 }[certsreqs]
2511 if support.verbose:
2512 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2513 sys.stdout.write(formatstr %
2514 (ssl.get_protocol_name(client_protocol),
2515 ssl.get_protocol_name(server_protocol),
2516 certtype))
2517 client_context = ssl.SSLContext(client_protocol)
2518 client_context.options |= client_options
2519 server_context = ssl.SSLContext(server_protocol)
2520 server_context.options |= server_options
2521
2522 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2523 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2524 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002525 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002526 client_context.set_ciphers("ALL")
2527
2528 for ctx in (client_context, server_context):
2529 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002530 ctx.load_cert_chain(SIGNED_CERTFILE)
2531 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002532 try:
2533 stats = server_params_test(client_context, server_context,
2534 chatty=False, connectionchatty=False)
2535 # Protocol mismatch can result in either an SSLError, or a
2536 # "Connection reset by peer" error.
2537 except ssl.SSLError:
2538 if expect_success:
2539 raise
2540 except OSError as e:
2541 if expect_success or e.errno != errno.ECONNRESET:
2542 raise
2543 else:
2544 if not expect_success:
2545 raise AssertionError(
2546 "Client protocol %s succeeded with server protocol %s!"
2547 % (ssl.get_protocol_name(client_protocol),
2548 ssl.get_protocol_name(server_protocol)))
2549 elif (expect_success is not True
2550 and expect_success != stats['version']):
2551 raise AssertionError("version mismatch: expected %r, got %r"
2552 % (expect_success, stats['version']))
2553
2554
2555class ThreadedTests(unittest.TestCase):
2556
2557 @skip_if_broken_ubuntu_ssl
2558 def test_echo(self):
2559 """Basic test of an SSL client connecting to a server"""
2560 if support.verbose:
2561 sys.stdout.write("\n")
2562 for protocol in PROTOCOLS:
2563 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2564 continue
2565 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2566 context = ssl.SSLContext(protocol)
2567 context.load_cert_chain(CERTFILE)
2568 server_params_test(context, context,
2569 chatty=True, connectionchatty=True)
2570
Christian Heimesa170fa12017-09-15 20:27:30 +02002571 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002572
2573 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2574 server_params_test(client_context=client_context,
2575 server_context=server_context,
2576 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002577 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002578
2579 client_context.check_hostname = False
2580 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2581 with self.assertRaises(ssl.SSLError) as e:
2582 server_params_test(client_context=server_context,
2583 server_context=client_context,
2584 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002585 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 self.assertIn('called a function you should not call',
2587 str(e.exception))
2588
2589 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2590 with self.assertRaises(ssl.SSLError) as e:
2591 server_params_test(client_context=server_context,
2592 server_context=server_context,
2593 chatty=True, connectionchatty=True)
2594 self.assertIn('called a function you should not call',
2595 str(e.exception))
2596
2597 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2598 with self.assertRaises(ssl.SSLError) as e:
2599 server_params_test(client_context=server_context,
2600 server_context=client_context,
2601 chatty=True, connectionchatty=True)
2602 self.assertIn('called a function you should not call',
2603 str(e.exception))
2604
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002605 def test_getpeercert(self):
2606 if support.verbose:
2607 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002608
2609 client_context, server_context, hostname = testing_context()
2610 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002611 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002612 with client_context.wrap_socket(socket.socket(),
2613 do_handshake_on_connect=False,
2614 server_hostname=hostname) as s:
2615 s.connect((HOST, server.port))
2616 # getpeercert() raise ValueError while the handshake isn't
2617 # done.
2618 with self.assertRaises(ValueError):
2619 s.getpeercert()
2620 s.do_handshake()
2621 cert = s.getpeercert()
2622 self.assertTrue(cert, "Can't get peer certificate.")
2623 cipher = s.cipher()
2624 if support.verbose:
2625 sys.stdout.write(pprint.pformat(cert) + '\n')
2626 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2627 if 'subject' not in cert:
2628 self.fail("No subject field in certificate: %s." %
2629 pprint.pformat(cert))
2630 if ((('organizationName', 'Python Software Foundation'),)
2631 not in cert['subject']):
2632 self.fail(
2633 "Missing or invalid 'organizationName' field in certificate subject; "
2634 "should be 'Python Software Foundation'.")
2635 self.assertIn('notBefore', cert)
2636 self.assertIn('notAfter', cert)
2637 before = ssl.cert_time_to_seconds(cert['notBefore'])
2638 after = ssl.cert_time_to_seconds(cert['notAfter'])
2639 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002640
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002641 @unittest.skipUnless(have_verify_flags(),
2642 "verify_flags need OpenSSL > 0.9.8")
2643 def test_crl_check(self):
2644 if support.verbose:
2645 sys.stdout.write("\n")
2646
Christian Heimesa170fa12017-09-15 20:27:30 +02002647 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002648
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002649 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002650 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002651
2652 # VERIFY_DEFAULT should pass
2653 server = ThreadedEchoServer(context=server_context, chatty=True)
2654 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002655 with client_context.wrap_socket(socket.socket(),
2656 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002657 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002658 cert = s.getpeercert()
2659 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002660
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002661 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002662 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002663
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002664 server = ThreadedEchoServer(context=server_context, chatty=True)
2665 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002666 with client_context.wrap_socket(socket.socket(),
2667 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002668 with self.assertRaisesRegex(ssl.SSLError,
2669 "certificate verify failed"):
2670 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002671
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002673 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002674
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675 server = ThreadedEchoServer(context=server_context, chatty=True)
2676 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002677 with client_context.wrap_socket(socket.socket(),
2678 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002679 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002680 cert = s.getpeercert()
2681 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002682
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002683 def test_check_hostname(self):
2684 if support.verbose:
2685 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002686
Christian Heimesa170fa12017-09-15 20:27:30 +02002687 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002688
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002689 # correct hostname should verify
2690 server = ThreadedEchoServer(context=server_context, chatty=True)
2691 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002692 with client_context.wrap_socket(socket.socket(),
2693 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002694 s.connect((HOST, server.port))
2695 cert = s.getpeercert()
2696 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002697
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 # incorrect hostname should raise an exception
2699 server = ThreadedEchoServer(context=server_context, chatty=True)
2700 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002701 with client_context.wrap_socket(socket.socket(),
2702 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002703 with self.assertRaisesRegex(
2704 ssl.CertificateError,
2705 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002707
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002708 # missing server_hostname arg should cause an exception, too
2709 server = ThreadedEchoServer(context=server_context, chatty=True)
2710 with server:
2711 with socket.socket() as s:
2712 with self.assertRaisesRegex(ValueError,
2713 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002714 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002715
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002716 def test_ecc_cert(self):
2717 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2718 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002719 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002720 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2721
2722 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2723 # load ECC cert
2724 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2725
2726 # correct hostname should verify
2727 server = ThreadedEchoServer(context=server_context, chatty=True)
2728 with server:
2729 with client_context.wrap_socket(socket.socket(),
2730 server_hostname=hostname) as s:
2731 s.connect((HOST, server.port))
2732 cert = s.getpeercert()
2733 self.assertTrue(cert, "Can't get peer certificate.")
2734 cipher = s.cipher()[0].split('-')
2735 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2736
2737 def test_dual_rsa_ecc(self):
2738 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2739 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002740 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2741 # algorithms.
2742 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002743 # only ECDSA certs
2744 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2745 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2746
2747 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2748 # load ECC and RSA key/cert pairs
2749 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2750 server_context.load_cert_chain(SIGNED_CERTFILE)
2751
2752 # correct hostname should verify
2753 server = ThreadedEchoServer(context=server_context, chatty=True)
2754 with server:
2755 with client_context.wrap_socket(socket.socket(),
2756 server_hostname=hostname) as s:
2757 s.connect((HOST, server.port))
2758 cert = s.getpeercert()
2759 self.assertTrue(cert, "Can't get peer certificate.")
2760 cipher = s.cipher()[0].split('-')
2761 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2762
Christian Heimes66e57422018-01-29 14:25:13 +01002763 def test_check_hostname_idn(self):
2764 if support.verbose:
2765 sys.stdout.write("\n")
2766
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002767 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002768 server_context.load_cert_chain(IDNSANSFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002769 # TODO: fix TLSv1.3 support
2770 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimes66e57422018-01-29 14:25:13 +01002771
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002772 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002773 context.verify_mode = ssl.CERT_REQUIRED
2774 context.check_hostname = True
2775 context.load_verify_locations(SIGNING_CA)
2776
2777 # correct hostname should verify, when specified in several
2778 # different ways
2779 idn_hostnames = [
2780 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002781 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002782 ('xn--knig-5qa.idn.pythontest.net',
2783 'xn--knig-5qa.idn.pythontest.net'),
2784 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002785 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002786
2787 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002788 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002789 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2790 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2791 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002792 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2793
2794 # ('königsgäßchen.idna2008.pythontest.net',
2795 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2796 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2797 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2798 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2799 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2800
Christian Heimes66e57422018-01-29 14:25:13 +01002801 ]
2802 for server_hostname, expected_hostname in idn_hostnames:
2803 server = ThreadedEchoServer(context=server_context, chatty=True)
2804 with server:
2805 with context.wrap_socket(socket.socket(),
2806 server_hostname=server_hostname) as s:
2807 self.assertEqual(s.server_hostname, expected_hostname)
2808 s.connect((HOST, server.port))
2809 cert = s.getpeercert()
2810 self.assertEqual(s.server_hostname, expected_hostname)
2811 self.assertTrue(cert, "Can't get peer certificate.")
2812
Christian Heimes66e57422018-01-29 14:25:13 +01002813 # incorrect hostname should raise an exception
2814 server = ThreadedEchoServer(context=server_context, chatty=True)
2815 with server:
2816 with context.wrap_socket(socket.socket(),
2817 server_hostname="python.example.org") as s:
2818 with self.assertRaises(ssl.CertificateError):
2819 s.connect((HOST, server.port))
2820
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002821 def test_wrong_cert(self):
2822 """Connecting when the server rejects the client's certificate
2823
2824 Launch a server with CERT_REQUIRED, and check that trying to
2825 connect to it with a wrong client certificate fails.
2826 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002827 client_context, server_context, hostname = testing_context()
2828 # load client cert
2829 client_context.load_cert_chain(WRONG_CERT)
2830 # require TLS client authentication
2831 server_context.verify_mode = ssl.CERT_REQUIRED
2832 # TODO: fix TLSv1.3 support
2833 # With TLS 1.3, test fails with exception in server thread
2834 server_context.options |= ssl.OP_NO_TLSv1_3
2835
2836 server = ThreadedEchoServer(
2837 context=server_context, chatty=True, connectionchatty=True,
2838 )
2839
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002840 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002841 client_context.wrap_socket(socket.socket(),
2842 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002843 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002844 # Expect either an SSL error about the server rejecting
2845 # the connection, or a low-level connection reset (which
2846 # sometimes happens on Windows)
2847 s.connect((HOST, server.port))
2848 except ssl.SSLError as e:
2849 if support.verbose:
2850 sys.stdout.write("\nSSLError is %r\n" % e)
2851 except OSError as e:
2852 if e.errno != errno.ECONNRESET:
2853 raise
2854 if support.verbose:
2855 sys.stdout.write("\nsocket.error is %r\n" % e)
2856 else:
2857 self.fail("Use of invalid cert should have failed!")
2858
2859 def test_rude_shutdown(self):
2860 """A brutal shutdown of an SSL server should raise an OSError
2861 in the client when attempting handshake.
2862 """
2863 listener_ready = threading.Event()
2864 listener_gone = threading.Event()
2865
2866 s = socket.socket()
2867 port = support.bind_port(s, HOST)
2868
2869 # `listener` runs in a thread. It sits in an accept() until
2870 # the main thread connects. Then it rudely closes the socket,
2871 # and sets Event `listener_gone` to let the main thread know
2872 # the socket is gone.
2873 def listener():
2874 s.listen()
2875 listener_ready.set()
2876 newsock, addr = s.accept()
2877 newsock.close()
2878 s.close()
2879 listener_gone.set()
2880
2881 def connector():
2882 listener_ready.wait()
2883 with socket.socket() as c:
2884 c.connect((HOST, port))
2885 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002886 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002887 ssl_sock = test_wrap_socket(c)
2888 except OSError:
2889 pass
2890 else:
2891 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002892
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002893 t = threading.Thread(target=listener)
2894 t.start()
2895 try:
2896 connector()
2897 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002898 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002899
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002900 def test_ssl_cert_verify_error(self):
2901 if support.verbose:
2902 sys.stdout.write("\n")
2903
2904 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2905 server_context.load_cert_chain(SIGNED_CERTFILE)
2906
2907 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2908
2909 server = ThreadedEchoServer(context=server_context, chatty=True)
2910 with server:
2911 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002912 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002913 try:
2914 s.connect((HOST, server.port))
2915 except ssl.SSLError as e:
2916 msg = 'unable to get local issuer certificate'
2917 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2918 self.assertEqual(e.verify_code, 20)
2919 self.assertEqual(e.verify_message, msg)
2920 self.assertIn(msg, repr(e))
2921 self.assertIn('certificate verify failed', repr(e))
2922
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002923 @skip_if_broken_ubuntu_ssl
2924 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2925 "OpenSSL is compiled without SSLv2 support")
2926 def test_protocol_sslv2(self):
2927 """Connecting to an SSLv2 server with various client options"""
2928 if support.verbose:
2929 sys.stdout.write("\n")
2930 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2931 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2932 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002933 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002934 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2935 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2936 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2937 # SSLv23 client with specific SSL options
2938 if no_sslv2_implies_sslv3_hello():
2939 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002940 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002941 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002942 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002943 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002944 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002945 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002946
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002947 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002948 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002949 """Connecting to an SSLv23 server with various client options"""
2950 if support.verbose:
2951 sys.stdout.write("\n")
2952 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002953 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002954 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002955 except OSError as x:
2956 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2957 if support.verbose:
2958 sys.stdout.write(
2959 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2960 % str(x))
2961 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002962 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2963 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2964 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002965
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002966 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002967 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2968 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2969 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002970
2971 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002972 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2973 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2974 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002975
2976 # Server with specific SSL options
2977 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002978 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002979 server_options=ssl.OP_NO_SSLv3)
2980 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002981 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002982 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002983 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002984 server_options=ssl.OP_NO_TLSv1)
2985
2986
2987 @skip_if_broken_ubuntu_ssl
2988 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2989 "OpenSSL is compiled without SSLv3 support")
2990 def test_protocol_sslv3(self):
2991 """Connecting to an SSLv3 server with various client options"""
2992 if support.verbose:
2993 sys.stdout.write("\n")
2994 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2995 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2996 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2997 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2998 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002999 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003000 client_options=ssl.OP_NO_SSLv3)
3001 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3002 if no_sslv2_implies_sslv3_hello():
3003 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003004 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003005 False, client_options=ssl.OP_NO_SSLv2)
3006
3007 @skip_if_broken_ubuntu_ssl
3008 def test_protocol_tlsv1(self):
3009 """Connecting to a TLSv1 server with various client options"""
3010 if support.verbose:
3011 sys.stdout.write("\n")
3012 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3013 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3014 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3015 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3016 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3017 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3018 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003019 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003020 client_options=ssl.OP_NO_TLSv1)
3021
3022 @skip_if_broken_ubuntu_ssl
3023 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3024 "TLS version 1.1 not supported.")
3025 def test_protocol_tlsv1_1(self):
3026 """Connecting to a TLSv1.1 server with various client options.
3027 Testing against older TLS versions."""
3028 if support.verbose:
3029 sys.stdout.write("\n")
3030 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3031 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3032 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3033 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3034 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003035 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003036 client_options=ssl.OP_NO_TLSv1_1)
3037
Christian Heimesa170fa12017-09-15 20:27:30 +02003038 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003039 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3040 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3041
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003042 @skip_if_broken_ubuntu_ssl
3043 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3044 "TLS version 1.2 not supported.")
3045 def test_protocol_tlsv1_2(self):
3046 """Connecting to a TLSv1.2 server with various client options.
3047 Testing against older TLS versions."""
3048 if support.verbose:
3049 sys.stdout.write("\n")
3050 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3051 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3052 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3053 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3054 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3055 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3056 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003057 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003058 client_options=ssl.OP_NO_TLSv1_2)
3059
Christian Heimesa170fa12017-09-15 20:27:30 +02003060 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003061 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3062 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3063 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3064 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3065
3066 def test_starttls(self):
3067 """Switching from clear text to encrypted and back again."""
3068 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3069
3070 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003071 starttls_server=True,
3072 chatty=True,
3073 connectionchatty=True)
3074 wrapped = False
3075 with server:
3076 s = socket.socket()
3077 s.setblocking(1)
3078 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003079 if support.verbose:
3080 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003081 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003082 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003083 sys.stdout.write(
3084 " client: sending %r...\n" % indata)
3085 if wrapped:
3086 conn.write(indata)
3087 outdata = conn.read()
3088 else:
3089 s.send(indata)
3090 outdata = s.recv(1024)
3091 msg = outdata.strip().lower()
3092 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3093 # STARTTLS ok, switch to secure mode
3094 if support.verbose:
3095 sys.stdout.write(
3096 " client: read %r from server, starting TLS...\n"
3097 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003098 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003099 wrapped = True
3100 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3101 # ENDTLS ok, switch back to clear text
3102 if support.verbose:
3103 sys.stdout.write(
3104 " client: read %r from server, ending TLS...\n"
3105 % msg)
3106 s = conn.unwrap()
3107 wrapped = False
3108 else:
3109 if support.verbose:
3110 sys.stdout.write(
3111 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003112 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003113 sys.stdout.write(" client: closing connection.\n")
3114 if wrapped:
3115 conn.write(b"over\n")
3116 else:
3117 s.send(b"over\n")
3118 if wrapped:
3119 conn.close()
3120 else:
3121 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003122
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123 def test_socketserver(self):
3124 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003125 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003126 # try to connect
3127 if support.verbose:
3128 sys.stdout.write('\n')
3129 with open(CERTFILE, 'rb') as f:
3130 d1 = f.read()
3131 d2 = ''
3132 # now fetch the same data from the HTTPS server
3133 url = 'https://localhost:%d/%s' % (
3134 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003135 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003136 f = urllib.request.urlopen(url, context=context)
3137 try:
3138 dlen = f.info().get("content-length")
3139 if dlen and (int(dlen) > 0):
3140 d2 = f.read(int(dlen))
3141 if support.verbose:
3142 sys.stdout.write(
3143 " client: read %d bytes from remote server '%s'\n"
3144 % (len(d2), server))
3145 finally:
3146 f.close()
3147 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003148
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003149 def test_asyncore_server(self):
3150 """Check the example asyncore integration."""
3151 if support.verbose:
3152 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003153
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003154 indata = b"FOO\n"
3155 server = AsyncoreEchoServer(CERTFILE)
3156 with server:
3157 s = test_wrap_socket(socket.socket())
3158 s.connect(('127.0.0.1', server.port))
3159 if support.verbose:
3160 sys.stdout.write(
3161 " client: sending %r...\n" % indata)
3162 s.write(indata)
3163 outdata = s.read()
3164 if support.verbose:
3165 sys.stdout.write(" client: read %r\n" % outdata)
3166 if outdata != indata.lower():
3167 self.fail(
3168 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3169 % (outdata[:20], len(outdata),
3170 indata[:20].lower(), len(indata)))
3171 s.write(b"over\n")
3172 if support.verbose:
3173 sys.stdout.write(" client: closing connection.\n")
3174 s.close()
3175 if support.verbose:
3176 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003177
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003178 def test_recv_send(self):
3179 """Test recv(), send() and friends."""
3180 if support.verbose:
3181 sys.stdout.write("\n")
3182
3183 server = ThreadedEchoServer(CERTFILE,
3184 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003185 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003186 cacerts=CERTFILE,
3187 chatty=True,
3188 connectionchatty=False)
3189 with server:
3190 s = test_wrap_socket(socket.socket(),
3191 server_side=False,
3192 certfile=CERTFILE,
3193 ca_certs=CERTFILE,
3194 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003195 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003196 s.connect((HOST, server.port))
3197 # helper methods for standardising recv* method signatures
3198 def _recv_into():
3199 b = bytearray(b"\0"*100)
3200 count = s.recv_into(b)
3201 return b[:count]
3202
3203 def _recvfrom_into():
3204 b = bytearray(b"\0"*100)
3205 count, addr = s.recvfrom_into(b)
3206 return b[:count]
3207
3208 # (name, method, expect success?, *args, return value func)
3209 send_methods = [
3210 ('send', s.send, True, [], len),
3211 ('sendto', s.sendto, False, ["some.address"], len),
3212 ('sendall', s.sendall, True, [], lambda x: None),
3213 ]
3214 # (name, method, whether to expect success, *args)
3215 recv_methods = [
3216 ('recv', s.recv, True, []),
3217 ('recvfrom', s.recvfrom, False, ["some.address"]),
3218 ('recv_into', _recv_into, True, []),
3219 ('recvfrom_into', _recvfrom_into, False, []),
3220 ]
3221 data_prefix = "PREFIX_"
3222
3223 for (meth_name, send_meth, expect_success, args,
3224 ret_val_meth) in send_methods:
3225 indata = (data_prefix + meth_name).encode('ascii')
3226 try:
3227 ret = send_meth(indata, *args)
3228 msg = "sending with {}".format(meth_name)
3229 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3230 outdata = s.read()
3231 if outdata != indata.lower():
3232 self.fail(
3233 "While sending with <<{name:s}>> bad data "
3234 "<<{outdata:r}>> ({nout:d}) received; "
3235 "expected <<{indata:r}>> ({nin:d})\n".format(
3236 name=meth_name, outdata=outdata[:20],
3237 nout=len(outdata),
3238 indata=indata[:20], nin=len(indata)
3239 )
3240 )
3241 except ValueError as e:
3242 if expect_success:
3243 self.fail(
3244 "Failed to send with method <<{name:s}>>; "
3245 "expected to succeed.\n".format(name=meth_name)
3246 )
3247 if not str(e).startswith(meth_name):
3248 self.fail(
3249 "Method <<{name:s}>> failed with unexpected "
3250 "exception message: {exp:s}\n".format(
3251 name=meth_name, exp=e
3252 )
3253 )
3254
3255 for meth_name, recv_meth, expect_success, args in recv_methods:
3256 indata = (data_prefix + meth_name).encode('ascii')
3257 try:
3258 s.send(indata)
3259 outdata = recv_meth(*args)
3260 if outdata != indata.lower():
3261 self.fail(
3262 "While receiving with <<{name:s}>> bad data "
3263 "<<{outdata:r}>> ({nout:d}) received; "
3264 "expected <<{indata:r}>> ({nin:d})\n".format(
3265 name=meth_name, outdata=outdata[:20],
3266 nout=len(outdata),
3267 indata=indata[:20], nin=len(indata)
3268 )
3269 )
3270 except ValueError as e:
3271 if expect_success:
3272 self.fail(
3273 "Failed to receive with method <<{name:s}>>; "
3274 "expected to succeed.\n".format(name=meth_name)
3275 )
3276 if not str(e).startswith(meth_name):
3277 self.fail(
3278 "Method <<{name:s}>> failed with unexpected "
3279 "exception message: {exp:s}\n".format(
3280 name=meth_name, exp=e
3281 )
3282 )
3283 # consume data
3284 s.read()
3285
3286 # read(-1, buffer) is supported, even though read(-1) is not
3287 data = b"data"
3288 s.send(data)
3289 buffer = bytearray(len(data))
3290 self.assertEqual(s.read(-1, buffer), len(data))
3291 self.assertEqual(buffer, data)
3292
Christian Heimes888bbdc2017-09-07 14:18:21 -07003293 # sendall accepts bytes-like objects
3294 if ctypes is not None:
3295 ubyte = ctypes.c_ubyte * len(data)
3296 byteslike = ubyte.from_buffer_copy(data)
3297 s.sendall(byteslike)
3298 self.assertEqual(s.read(), data)
3299
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003300 # Make sure sendmsg et al are disallowed to avoid
3301 # inadvertent disclosure of data and/or corruption
3302 # of the encrypted data stream
3303 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3304 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3305 self.assertRaises(NotImplementedError,
3306 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003307 s.write(b"over\n")
3308
3309 self.assertRaises(ValueError, s.recv, -1)
3310 self.assertRaises(ValueError, s.read, -1)
3311
3312 s.close()
3313
3314 def test_recv_zero(self):
3315 server = ThreadedEchoServer(CERTFILE)
3316 server.__enter__()
3317 self.addCleanup(server.__exit__, None, None)
3318 s = socket.create_connection((HOST, server.port))
3319 self.addCleanup(s.close)
3320 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3321 self.addCleanup(s.close)
3322
3323 # recv/read(0) should return no data
3324 s.send(b"data")
3325 self.assertEqual(s.recv(0), b"")
3326 self.assertEqual(s.read(0), b"")
3327 self.assertEqual(s.read(), b"data")
3328
3329 # Should not block if the other end sends no data
3330 s.setblocking(False)
3331 self.assertEqual(s.recv(0), b"")
3332 self.assertEqual(s.recv_into(bytearray()), 0)
3333
3334 def test_nonblocking_send(self):
3335 server = ThreadedEchoServer(CERTFILE,
3336 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003337 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003338 cacerts=CERTFILE,
3339 chatty=True,
3340 connectionchatty=False)
3341 with server:
3342 s = test_wrap_socket(socket.socket(),
3343 server_side=False,
3344 certfile=CERTFILE,
3345 ca_certs=CERTFILE,
3346 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003347 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003348 s.connect((HOST, server.port))
3349 s.setblocking(False)
3350
3351 # If we keep sending data, at some point the buffers
3352 # will be full and the call will block
3353 buf = bytearray(8192)
3354 def fill_buffer():
3355 while True:
3356 s.send(buf)
3357 self.assertRaises((ssl.SSLWantWriteError,
3358 ssl.SSLWantReadError), fill_buffer)
3359
3360 # Now read all the output and discard it
3361 s.setblocking(True)
3362 s.close()
3363
3364 def test_handshake_timeout(self):
3365 # Issue #5103: SSL handshake must respect the socket timeout
3366 server = socket.socket(socket.AF_INET)
3367 host = "127.0.0.1"
3368 port = support.bind_port(server)
3369 started = threading.Event()
3370 finish = False
3371
3372 def serve():
3373 server.listen()
3374 started.set()
3375 conns = []
3376 while not finish:
3377 r, w, e = select.select([server], [], [], 0.1)
3378 if server in r:
3379 # Let the socket hang around rather than having
3380 # it closed by garbage collection.
3381 conns.append(server.accept()[0])
3382 for sock in conns:
3383 sock.close()
3384
3385 t = threading.Thread(target=serve)
3386 t.start()
3387 started.wait()
3388
3389 try:
3390 try:
3391 c = socket.socket(socket.AF_INET)
3392 c.settimeout(0.2)
3393 c.connect((host, port))
3394 # Will attempt handshake and time out
3395 self.assertRaisesRegex(socket.timeout, "timed out",
3396 test_wrap_socket, c)
3397 finally:
3398 c.close()
3399 try:
3400 c = socket.socket(socket.AF_INET)
3401 c = test_wrap_socket(c)
3402 c.settimeout(0.2)
3403 # Will attempt handshake and time out
3404 self.assertRaisesRegex(socket.timeout, "timed out",
3405 c.connect, (host, port))
3406 finally:
3407 c.close()
3408 finally:
3409 finish = True
3410 t.join()
3411 server.close()
3412
3413 def test_server_accept(self):
3414 # Issue #16357: accept() on a SSLSocket created through
3415 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003416 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003417 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003418 context.load_verify_locations(SIGNING_CA)
3419 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003420 server = socket.socket(socket.AF_INET)
3421 host = "127.0.0.1"
3422 port = support.bind_port(server)
3423 server = context.wrap_socket(server, server_side=True)
3424 self.assertTrue(server.server_side)
3425
3426 evt = threading.Event()
3427 remote = None
3428 peer = None
3429 def serve():
3430 nonlocal remote, peer
3431 server.listen()
3432 # Block on the accept and wait on the connection to close.
3433 evt.set()
3434 remote, peer = server.accept()
3435 remote.recv(1)
3436
3437 t = threading.Thread(target=serve)
3438 t.start()
3439 # Client wait until server setup and perform a connect.
3440 evt.wait()
3441 client = context.wrap_socket(socket.socket())
3442 client.connect((host, port))
3443 client_addr = client.getsockname()
3444 client.close()
3445 t.join()
3446 remote.close()
3447 server.close()
3448 # Sanity checks.
3449 self.assertIsInstance(remote, ssl.SSLSocket)
3450 self.assertEqual(peer, client_addr)
3451
3452 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003453 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003454 with context.wrap_socket(socket.socket()) as sock:
3455 with self.assertRaises(OSError) as cm:
3456 sock.getpeercert()
3457 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3458
3459 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003460 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003461 with context.wrap_socket(socket.socket()) as sock:
3462 with self.assertRaises(OSError) as cm:
3463 sock.do_handshake()
3464 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3465
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003466 def test_no_shared_ciphers(self):
3467 client_context, server_context, hostname = testing_context()
3468 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3469 client_context.options |= ssl.OP_NO_TLSv1_3
3470 # Force different suites on client and master
3471 client_context.set_ciphers("AES128")
3472 server_context.set_ciphers("AES256")
3473 with ThreadedEchoServer(context=server_context) as server:
3474 with client_context.wrap_socket(socket.socket(),
3475 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003476 with self.assertRaises(OSError):
3477 s.connect((HOST, server.port))
3478 self.assertIn("no shared cipher", server.conn_errors[0])
3479
3480 def test_version_basic(self):
3481 """
3482 Basic tests for SSLSocket.version().
3483 More tests are done in the test_protocol_*() methods.
3484 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003485 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3486 context.check_hostname = False
3487 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003488 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003489 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003490 chatty=False) as server:
3491 with context.wrap_socket(socket.socket()) as s:
3492 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003493 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003494 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003495 if ssl.OPENSSL_VERSION_INFO >= (1, 1, 1):
3496 self.assertEqual(s.version(), 'TLSv1.3')
3497 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003498 self.assertEqual(s.version(), 'TLSv1.2')
3499 else: # 0.9.8 to 1.0.1
3500 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003501 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003502 self.assertIs(s.version(), None)
3503
Christian Heimescb5b68a2017-09-07 18:07:00 -07003504 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3505 "test requires TLSv1.3 enabled OpenSSL")
3506 def test_tls1_3(self):
3507 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3508 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003509 context.options |= (
3510 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3511 )
3512 with ThreadedEchoServer(context=context) as server:
3513 with context.wrap_socket(socket.socket()) as s:
3514 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003515 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003516 'TLS_AES_256_GCM_SHA384',
3517 'TLS_CHACHA20_POLY1305_SHA256',
3518 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003519 })
3520 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003521
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003522 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3523 "required OpenSSL 1.1.0g")
3524 def test_min_max_version(self):
3525 client_context, server_context, hostname = testing_context()
3526 # client TLSv1.0 to 1.2
3527 client_context.minimum_version = ssl.TLSVersion.TLSv1
3528 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3529 # server only TLSv1.2
3530 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3531 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3532
3533 with ThreadedEchoServer(context=server_context) as server:
3534 with client_context.wrap_socket(socket.socket(),
3535 server_hostname=hostname) as s:
3536 s.connect((HOST, server.port))
3537 self.assertEqual(s.version(), 'TLSv1.2')
3538
3539 # client 1.0 to 1.2, server 1.0 to 1.1
3540 server_context.minimum_version = ssl.TLSVersion.TLSv1
3541 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3542
3543 with ThreadedEchoServer(context=server_context) as server:
3544 with client_context.wrap_socket(socket.socket(),
3545 server_hostname=hostname) as s:
3546 s.connect((HOST, server.port))
3547 self.assertEqual(s.version(), 'TLSv1.1')
3548
3549 # client 1.0, server 1.2 (mismatch)
3550 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3551 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3552 client_context.minimum_version = ssl.TLSVersion.TLSv1
3553 client_context.maximum_version = ssl.TLSVersion.TLSv1
3554 with ThreadedEchoServer(context=server_context) as server:
3555 with client_context.wrap_socket(socket.socket(),
3556 server_hostname=hostname) as s:
3557 with self.assertRaises(ssl.SSLError) as e:
3558 s.connect((HOST, server.port))
3559 self.assertIn("alert", str(e.exception))
3560
3561
3562 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3563 "required OpenSSL 1.1.0g")
3564 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3565 def test_min_max_version_sslv3(self):
3566 client_context, server_context, hostname = testing_context()
3567 server_context.minimum_version = ssl.TLSVersion.SSLv3
3568 client_context.minimum_version = ssl.TLSVersion.SSLv3
3569 client_context.maximum_version = ssl.TLSVersion.SSLv3
3570 with ThreadedEchoServer(context=server_context) as server:
3571 with client_context.wrap_socket(socket.socket(),
3572 server_hostname=hostname) as s:
3573 s.connect((HOST, server.port))
3574 self.assertEqual(s.version(), 'SSLv3')
3575
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003576 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3577 def test_default_ecdh_curve(self):
3578 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3579 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003580 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003581 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003582 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3583 # cipher name.
3584 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003585 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3586 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3587 # our default cipher list should prefer ECDH-based ciphers
3588 # automatically.
3589 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3590 context.set_ciphers("ECCdraft:ECDH")
3591 with ThreadedEchoServer(context=context) as server:
3592 with context.wrap_socket(socket.socket()) as s:
3593 s.connect((HOST, server.port))
3594 self.assertIn("ECDH", s.cipher()[0])
3595
3596 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3597 "'tls-unique' channel binding not available")
3598 def test_tls_unique_channel_binding(self):
3599 """Test tls-unique channel binding."""
3600 if support.verbose:
3601 sys.stdout.write("\n")
3602
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003603 client_context, server_context, hostname = testing_context()
3604 # TODO: fix TLSv1.3 support
3605 client_context.options |= ssl.OP_NO_TLSv1_3
3606
3607 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003608 chatty=True,
3609 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003610
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003611 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003612 with client_context.wrap_socket(
3613 socket.socket(),
3614 server_hostname=hostname) as s:
3615 s.connect((HOST, server.port))
3616 # get the data
3617 cb_data = s.get_channel_binding("tls-unique")
3618 if support.verbose:
3619 sys.stdout.write(
3620 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003621
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003622 # check if it is sane
3623 self.assertIsNotNone(cb_data)
3624 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003625
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003626 # and compare with the peers version
3627 s.write(b"CB tls-unique\n")
3628 peer_data_repr = s.read().strip()
3629 self.assertEqual(peer_data_repr,
3630 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003631
3632 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003633 with client_context.wrap_socket(
3634 socket.socket(),
3635 server_hostname=hostname) as s:
3636 s.connect((HOST, server.port))
3637 new_cb_data = s.get_channel_binding("tls-unique")
3638 if support.verbose:
3639 sys.stdout.write(
3640 "got another channel binding data: {0!r}\n".format(
3641 new_cb_data)
3642 )
3643 # is it really unique
3644 self.assertNotEqual(cb_data, new_cb_data)
3645 self.assertIsNotNone(cb_data)
3646 self.assertEqual(len(cb_data), 12) # True for TLSv1
3647 s.write(b"CB tls-unique\n")
3648 peer_data_repr = s.read().strip()
3649 self.assertEqual(peer_data_repr,
3650 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003651
3652 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003653 client_context, server_context, hostname = testing_context()
3654 stats = server_params_test(client_context, server_context,
3655 chatty=True, connectionchatty=True,
3656 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003657 if support.verbose:
3658 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3659 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3660
3661 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3662 "ssl.OP_NO_COMPRESSION needed for this test")
3663 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003664 client_context, server_context, hostname = testing_context()
3665 client_context.options |= ssl.OP_NO_COMPRESSION
3666 server_context.options |= ssl.OP_NO_COMPRESSION
3667 stats = server_params_test(client_context, server_context,
3668 chatty=True, connectionchatty=True,
3669 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003670 self.assertIs(stats['compression'], None)
3671
3672 def test_dh_params(self):
3673 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003674 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003675 # test scenario needs TLS <= 1.2
3676 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003677 server_context.load_dh_params(DHFILE)
3678 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003679 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003680 stats = server_params_test(client_context, server_context,
3681 chatty=True, connectionchatty=True,
3682 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003683 cipher = stats["cipher"][0]
3684 parts = cipher.split("-")
3685 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3686 self.fail("Non-DH cipher: " + cipher[0])
3687
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003688 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003689 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003690 def test_ecdh_curve(self):
3691 # server secp384r1, client auto
3692 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003693
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003694 server_context.set_ecdh_curve("secp384r1")
3695 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3696 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3697 stats = server_params_test(client_context, server_context,
3698 chatty=True, connectionchatty=True,
3699 sni_name=hostname)
3700
3701 # server auto, client secp384r1
3702 client_context, server_context, hostname = testing_context()
3703 client_context.set_ecdh_curve("secp384r1")
3704 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3705 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3706 stats = server_params_test(client_context, server_context,
3707 chatty=True, connectionchatty=True,
3708 sni_name=hostname)
3709
3710 # server / client curve mismatch
3711 client_context, server_context, hostname = testing_context()
3712 client_context.set_ecdh_curve("prime256v1")
3713 server_context.set_ecdh_curve("secp384r1")
3714 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3715 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3716 try:
3717 stats = server_params_test(client_context, server_context,
3718 chatty=True, connectionchatty=True,
3719 sni_name=hostname)
3720 except ssl.SSLError:
3721 pass
3722 else:
3723 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003724 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003725 self.fail("mismatch curve did not fail")
3726
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003727 def test_selected_alpn_protocol(self):
3728 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003729 client_context, server_context, hostname = testing_context()
3730 stats = server_params_test(client_context, server_context,
3731 chatty=True, connectionchatty=True,
3732 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003733 self.assertIs(stats['client_alpn_protocol'], None)
3734
3735 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3736 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3737 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003738 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003739 server_context.set_alpn_protocols(['foo', 'bar'])
3740 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003741 chatty=True, connectionchatty=True,
3742 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003743 self.assertIs(stats['client_alpn_protocol'], None)
3744
3745 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3746 def test_alpn_protocols(self):
3747 server_protocols = ['foo', 'bar', 'milkshake']
3748 protocol_tests = [
3749 (['foo', 'bar'], 'foo'),
3750 (['bar', 'foo'], 'foo'),
3751 (['milkshake'], 'milkshake'),
3752 (['http/3.0', 'http/4.0'], None)
3753 ]
3754 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003755 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003756 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003757 client_context.set_alpn_protocols(client_protocols)
3758
3759 try:
3760 stats = server_params_test(client_context,
3761 server_context,
3762 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003763 connectionchatty=True,
3764 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003765 except ssl.SSLError as e:
3766 stats = e
3767
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003768 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3770 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3771 self.assertIsInstance(stats, ssl.SSLError)
3772 else:
3773 msg = "failed trying %s (s) and %s (c).\n" \
3774 "was expecting %s, but got %%s from the %%s" \
3775 % (str(server_protocols), str(client_protocols),
3776 str(expected))
3777 client_result = stats['client_alpn_protocol']
3778 self.assertEqual(client_result, expected,
3779 msg % (client_result, "client"))
3780 server_result = stats['server_alpn_protocols'][-1] \
3781 if len(stats['server_alpn_protocols']) else 'nothing'
3782 self.assertEqual(server_result, expected,
3783 msg % (server_result, "server"))
3784
3785 def test_selected_npn_protocol(self):
3786 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003787 client_context, server_context, hostname = testing_context()
3788 stats = server_params_test(client_context, server_context,
3789 chatty=True, connectionchatty=True,
3790 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003791 self.assertIs(stats['client_npn_protocol'], None)
3792
3793 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3794 def test_npn_protocols(self):
3795 server_protocols = ['http/1.1', 'spdy/2']
3796 protocol_tests = [
3797 (['http/1.1', 'spdy/2'], 'http/1.1'),
3798 (['spdy/2', 'http/1.1'], 'http/1.1'),
3799 (['spdy/2', 'test'], 'spdy/2'),
3800 (['abc', 'def'], 'abc')
3801 ]
3802 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003803 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003804 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003805 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003806 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003807 chatty=True, connectionchatty=True,
3808 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003809 msg = "failed trying %s (s) and %s (c).\n" \
3810 "was expecting %s, but got %%s from the %%s" \
3811 % (str(server_protocols), str(client_protocols),
3812 str(expected))
3813 client_result = stats['client_npn_protocol']
3814 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3815 server_result = stats['server_npn_protocols'][-1] \
3816 if len(stats['server_npn_protocols']) else 'nothing'
3817 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3818
3819 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003820 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003821 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003822 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003823 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003824 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003825 client_context.load_verify_locations(SIGNING_CA)
3826 return server_context, other_context, client_context
3827
3828 def check_common_name(self, stats, name):
3829 cert = stats['peercert']
3830 self.assertIn((('commonName', name),), cert['subject'])
3831
3832 @needs_sni
3833 def test_sni_callback(self):
3834 calls = []
3835 server_context, other_context, client_context = self.sni_contexts()
3836
Christian Heimesa170fa12017-09-15 20:27:30 +02003837 client_context.check_hostname = False
3838
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003839 def servername_cb(ssl_sock, server_name, initial_context):
3840 calls.append((server_name, initial_context))
3841 if server_name is not None:
3842 ssl_sock.context = other_context
3843 server_context.set_servername_callback(servername_cb)
3844
3845 stats = server_params_test(client_context, server_context,
3846 chatty=True,
3847 sni_name='supermessage')
3848 # The hostname was fetched properly, and the certificate was
3849 # changed for the connection.
3850 self.assertEqual(calls, [("supermessage", server_context)])
3851 # CERTFILE4 was selected
3852 self.check_common_name(stats, 'fakehostname')
3853
3854 calls = []
3855 # The callback is called with server_name=None
3856 stats = server_params_test(client_context, server_context,
3857 chatty=True,
3858 sni_name=None)
3859 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003860 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003861
3862 # Check disabling the callback
3863 calls = []
3864 server_context.set_servername_callback(None)
3865
3866 stats = server_params_test(client_context, server_context,
3867 chatty=True,
3868 sni_name='notfunny')
3869 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003870 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003871 self.assertEqual(calls, [])
3872
3873 @needs_sni
3874 def test_sni_callback_alert(self):
3875 # Returning a TLS alert is reflected to the connecting client
3876 server_context, other_context, client_context = self.sni_contexts()
3877
3878 def cb_returning_alert(ssl_sock, server_name, initial_context):
3879 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3880 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003881 with self.assertRaises(ssl.SSLError) as cm:
3882 stats = server_params_test(client_context, server_context,
3883 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003884 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003885 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003886
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003887 @needs_sni
3888 def test_sni_callback_raising(self):
3889 # Raising fails the connection with a TLS handshake failure alert.
3890 server_context, other_context, client_context = self.sni_contexts()
3891
3892 def cb_raising(ssl_sock, server_name, initial_context):
3893 1/0
3894 server_context.set_servername_callback(cb_raising)
3895
3896 with self.assertRaises(ssl.SSLError) as cm, \
3897 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003898 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003899 chatty=False,
3900 sni_name='supermessage')
3901 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3902 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003903
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003904 @needs_sni
3905 def test_sni_callback_wrong_return_type(self):
3906 # Returning the wrong return type terminates the TLS connection
3907 # with an internal error alert.
3908 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003909
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003910 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3911 return "foo"
3912 server_context.set_servername_callback(cb_wrong_return_type)
3913
3914 with self.assertRaises(ssl.SSLError) as cm, \
3915 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003916 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003917 chatty=False,
3918 sni_name='supermessage')
3919 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3920 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003921
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003922 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003923 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003924 client_context.set_ciphers("AES128:AES256")
3925 server_context.set_ciphers("AES256")
3926 expected_algs = [
3927 "AES256", "AES-256",
3928 # TLS 1.3 ciphers are always enabled
3929 "TLS_CHACHA20", "TLS_AES",
3930 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003931
Christian Heimesa170fa12017-09-15 20:27:30 +02003932 stats = server_params_test(client_context, server_context,
3933 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003934 ciphers = stats['server_shared_ciphers'][0]
3935 self.assertGreater(len(ciphers), 0)
3936 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003937 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003938 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003939
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003940 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003941 client_context, server_context, hostname = testing_context()
3942 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003943
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003944 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003945 s = client_context.wrap_socket(socket.socket(),
3946 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003947 s.connect((HOST, server.port))
3948 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003949
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003950 self.assertRaises(ValueError, s.read, 1024)
3951 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003952
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003953 def test_sendfile(self):
3954 TEST_DATA = b"x" * 512
3955 with open(support.TESTFN, 'wb') as f:
3956 f.write(TEST_DATA)
3957 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003958 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003959 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003960 context.load_verify_locations(SIGNING_CA)
3961 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003962 server = ThreadedEchoServer(context=context, chatty=False)
3963 with server:
3964 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003965 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003966 with open(support.TESTFN, 'rb') as file:
3967 s.sendfile(file)
3968 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003969
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003970 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003971 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003972 # TODO: sessions aren't compatible with TLSv1.3 yet
3973 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003974
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003975 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003976 stats = server_params_test(client_context, server_context,
3977 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003978 session = stats['session']
3979 self.assertTrue(session.id)
3980 self.assertGreater(session.time, 0)
3981 self.assertGreater(session.timeout, 0)
3982 self.assertTrue(session.has_ticket)
3983 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3984 self.assertGreater(session.ticket_lifetime_hint, 0)
3985 self.assertFalse(stats['session_reused'])
3986 sess_stat = server_context.session_stats()
3987 self.assertEqual(sess_stat['accept'], 1)
3988 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003989
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003990 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003991 stats = server_params_test(client_context, server_context,
3992 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003993 sess_stat = server_context.session_stats()
3994 self.assertEqual(sess_stat['accept'], 2)
3995 self.assertEqual(sess_stat['hits'], 1)
3996 self.assertTrue(stats['session_reused'])
3997 session2 = stats['session']
3998 self.assertEqual(session2.id, session.id)
3999 self.assertEqual(session2, session)
4000 self.assertIsNot(session2, session)
4001 self.assertGreaterEqual(session2.time, session.time)
4002 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004003
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004004 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004005 stats = server_params_test(client_context, server_context,
4006 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004007 self.assertFalse(stats['session_reused'])
4008 session3 = stats['session']
4009 self.assertNotEqual(session3.id, session.id)
4010 self.assertNotEqual(session3, session)
4011 sess_stat = server_context.session_stats()
4012 self.assertEqual(sess_stat['accept'], 3)
4013 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004014
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004015 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004016 stats = server_params_test(client_context, server_context,
4017 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004018 self.assertTrue(stats['session_reused'])
4019 session4 = stats['session']
4020 self.assertEqual(session4.id, session.id)
4021 self.assertEqual(session4, session)
4022 self.assertGreaterEqual(session4.time, session.time)
4023 self.assertGreaterEqual(session4.timeout, session.timeout)
4024 sess_stat = server_context.session_stats()
4025 self.assertEqual(sess_stat['accept'], 4)
4026 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004027
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004028 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004029 client_context, server_context, hostname = testing_context()
4030 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004031
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004032 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004033 client_context.options |= ssl.OP_NO_TLSv1_3
4034 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004035
Christian Heimesa170fa12017-09-15 20:27:30 +02004036 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004037 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004038 with client_context.wrap_socket(socket.socket(),
4039 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004040 # session is None before handshake
4041 self.assertEqual(s.session, None)
4042 self.assertEqual(s.session_reused, None)
4043 s.connect((HOST, server.port))
4044 session = s.session
4045 self.assertTrue(session)
4046 with self.assertRaises(TypeError) as e:
4047 s.session = object
4048 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004049
Christian Heimesa170fa12017-09-15 20:27:30 +02004050 with client_context.wrap_socket(socket.socket(),
4051 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004052 s.connect((HOST, server.port))
4053 # cannot set session after handshake
4054 with self.assertRaises(ValueError) as e:
4055 s.session = session
4056 self.assertEqual(str(e.exception),
4057 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004058
Christian Heimesa170fa12017-09-15 20:27:30 +02004059 with client_context.wrap_socket(socket.socket(),
4060 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004061 # can set session before handshake and before the
4062 # connection was established
4063 s.session = session
4064 s.connect((HOST, server.port))
4065 self.assertEqual(s.session.id, session.id)
4066 self.assertEqual(s.session, session)
4067 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004068
Christian Heimesa170fa12017-09-15 20:27:30 +02004069 with client_context2.wrap_socket(socket.socket(),
4070 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 # cannot re-use session with a different SSLContext
4072 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004073 s.session = session
4074 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004075 self.assertEqual(str(e.exception),
4076 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004077
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004078
Thomas Woutersed03b412007-08-28 21:37:11 +00004079def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004080 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004081 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004082 plats = {
4083 'Linux': platform.linux_distribution,
4084 'Mac': platform.mac_ver,
4085 'Windows': platform.win32_ver,
4086 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004087 with warnings.catch_warnings():
4088 warnings.filterwarnings(
4089 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004090 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004091 'functions are deprecated .*',
4092 PendingDeprecationWarning,
4093 )
4094 for name, func in plats.items():
4095 plat = func()
4096 if plat and plat[0]:
4097 plat = '%s %r' % (name, plat)
4098 break
4099 else:
4100 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004101 print("test_ssl: testing with %r %r" %
4102 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4103 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004104 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004105 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4106 try:
4107 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4108 except AttributeError:
4109 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004110
Antoine Pitrou152efa22010-05-16 18:19:27 +00004111 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004112 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004113 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004114 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004115 BADCERT, BADKEY, EMPTYCERT]:
4116 if not os.path.exists(filename):
4117 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004118
Martin Panter3840b2a2016-03-27 01:53:46 +00004119 tests = [
4120 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004121 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004122 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004123
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004124 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004125 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004126
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004127 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004128 try:
4129 support.run_unittest(*tests)
4130 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004132
4133if __name__ == "__main__":
4134 test_main()