blob: 5c22630d190c2cb9d4342af29e9d0e840abf6e36 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
64 'notAfter': 'Jan 17 19:09:06 2028 GMT',
65 'notBefore': 'Jan 19 19:09:06 2018 GMT',
66 'serialNumber': 'F9BA076D5B6ABD9B',
67 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
89 'notAfter': 'Nov 28 19:09:06 2027 GMT',
90 'notBefore': 'Jan 19 19:09:06 2018 GMT',
91 'serialNumber': '82EDBF41C880919C',
92 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000119
Miss Islington (bot)e3228a32018-08-14 10:52:27 -0400120DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100121BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000122
Christian Heimes358cfd42016-09-10 22:43:48 +0200123# Not defined in all versions of OpenSSL
124OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
125OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
126OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
127OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800128OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Thomas Woutersed03b412007-08-28 21:37:11 +0000131def handle_error(prefix):
132 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000133 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000134 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Antoine Pitroub5218772010-05-21 09:56:06 +0000136def can_clear_options():
137 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200138 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000139
140def no_sslv2_implies_sslv3_hello():
141 # 0.9.7h or higher
142 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
143
Christian Heimes2427b502013-11-23 11:24:32 +0100144def have_verify_flags():
145 # 0.9.8 or higher
146 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
147
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800148def _have_secp_curves():
149 if not ssl.HAS_ECDH:
150 return False
151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
152 try:
153 ctx.set_ecdh_curve("secp384r1")
154 except ValueError:
155 return False
156 else:
157 return True
158
159
160HAVE_SECP_CURVES = _have_secp_curves()
161
162
Antoine Pitrouc695c952014-04-28 20:57:36 +0200163def utc_offset(): #NOTE: ignore issues like #1647654
164 # local time = utc time + utc offset
165 if time.daylight and time.localtime().tm_isdst > 0:
166 return -time.altzone # seconds
167 return -time.timezone
168
Christian Heimes9424bb42013-06-17 15:32:57 +0200169def asn1time(cert_time):
170 # Some versions of OpenSSL ignore seconds, see #18207
171 # 0.9.8.i
172 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
173 fmt = "%b %d %H:%M:%S %Y GMT"
174 dt = datetime.datetime.strptime(cert_time, fmt)
175 dt = dt.replace(second=0)
176 cert_time = dt.strftime(fmt)
177 # %d adds leading zero but ASN1_TIME_print() uses leading space
178 if cert_time[4] == "0":
179 cert_time = cert_time[:4] + " " + cert_time[5:]
180
181 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000182
Antoine Pitrou23df4832010-08-04 17:14:06 +0000183# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
184def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200185 if hasattr(ssl, 'PROTOCOL_SSLv2'):
186 @functools.wraps(func)
187 def f(*args, **kwargs):
188 try:
189 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
190 except ssl.SSLError:
191 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
192 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
193 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
194 return func(*args, **kwargs)
195 return f
196 else:
197 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000198
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100199needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
200
Antoine Pitrou23df4832010-08-04 17:14:06 +0000201
Christian Heimesd0486372016-09-10 23:23:33 +0200202def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
203 cert_reqs=ssl.CERT_NONE, ca_certs=None,
204 ciphers=None, certfile=None, keyfile=None,
205 **kwargs):
206 context = ssl.SSLContext(ssl_version)
207 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200208 if cert_reqs == ssl.CERT_NONE:
209 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200210 context.verify_mode = cert_reqs
211 if ca_certs is not None:
212 context.load_verify_locations(ca_certs)
213 if certfile is not None or keyfile is not None:
214 context.load_cert_chain(certfile, keyfile)
215 if ciphers is not None:
216 context.set_ciphers(ciphers)
217 return context.wrap_socket(sock, **kwargs)
218
Christian Heimesa170fa12017-09-15 20:27:30 +0200219
220def testing_context(server_cert=SIGNED_CERTFILE):
221 """Create context
222
223 client_context, server_context, hostname = testing_context()
224 """
225 if server_cert == SIGNED_CERTFILE:
226 hostname = SIGNED_CERTFILE_HOSTNAME
227 elif server_cert == SIGNED_CERTFILE2:
228 hostname = SIGNED_CERTFILE2_HOSTNAME
229 else:
230 raise ValueError(server_cert)
231
232 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
233 client_context.load_verify_locations(SIGNING_CA)
234
235 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
236 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800237 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200238
239 return client_context, server_context, hostname
240
241
Antoine Pitrou152efa22010-05-16 18:19:27 +0000242class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000243
Antoine Pitrou480a1242010-04-28 21:37:09 +0000244 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000245 ssl.CERT_NONE
246 ssl.CERT_OPTIONAL
247 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100248 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100249 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100250 if ssl.HAS_ECDH:
251 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100252 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
253 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000254 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100255 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700256 ssl.OP_NO_SSLv2
257 ssl.OP_NO_SSLv3
258 ssl.OP_NO_TLSv1
259 ssl.OP_NO_TLSv1_3
260 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
261 ssl.OP_NO_TLSv1_1
262 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200263 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000264
Christian Heimes89c20512018-02-27 11:17:32 +0100265 def test_private_init(self):
266 with self.assertRaisesRegex(TypeError, "public constructor"):
267 with socket.socket() as s:
268 ssl.SSLSocket(s)
269
Antoine Pitrou172f0252014-04-18 20:33:08 +0200270 def test_str_for_enums(self):
271 # Make sure that the PROTOCOL_* constants have enum-like string
272 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200273 proto = ssl.PROTOCOL_TLS
274 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200275 ctx = ssl.SSLContext(proto)
276 self.assertIs(ctx.protocol, proto)
277
Antoine Pitrou480a1242010-04-28 21:37:09 +0000278 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000280 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 sys.stdout.write("\n RAND_status is %d (%s)\n"
282 % (v, (v and "sufficient randomness") or
283 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200284
285 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
286 self.assertEqual(len(data), 16)
287 self.assertEqual(is_cryptographic, v == 1)
288 if v:
289 data = ssl.RAND_bytes(16)
290 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200291 else:
292 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200293
Victor Stinner1e81a392013-12-19 16:47:04 +0100294 # negative num is invalid
295 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
296 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
297
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100298 if hasattr(ssl, 'RAND_egd'):
299 self.assertRaises(TypeError, ssl.RAND_egd, 1)
300 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000301 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200302 ssl.RAND_add(b"this is a random bytes object", 75.0)
303 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000304
Christian Heimesf77b4b22013-08-21 13:26:05 +0200305 @unittest.skipUnless(os.name == 'posix', 'requires posix')
306 def test_random_fork(self):
307 status = ssl.RAND_status()
308 if not status:
309 self.fail("OpenSSL's PRNG has insufficient randomness")
310
311 rfd, wfd = os.pipe()
312 pid = os.fork()
313 if pid == 0:
314 try:
315 os.close(rfd)
316 child_random = ssl.RAND_pseudo_bytes(16)[0]
317 self.assertEqual(len(child_random), 16)
318 os.write(wfd, child_random)
319 os.close(wfd)
320 except BaseException:
321 os._exit(1)
322 else:
323 os._exit(0)
324 else:
325 os.close(wfd)
326 self.addCleanup(os.close, rfd)
327 _, status = os.waitpid(pid, 0)
328 self.assertEqual(status, 0)
329
330 child_random = os.read(rfd, 16)
331 self.assertEqual(len(child_random), 16)
332 parent_random = ssl.RAND_pseudo_bytes(16)[0]
333 self.assertEqual(len(parent_random), 16)
334
335 self.assertNotEqual(child_random, parent_random)
336
Antoine Pitrou480a1242010-04-28 21:37:09 +0000337 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000338 # note that this uses an 'unofficial' function in _ssl.c,
339 # provided solely for this test, to exercise the certificate
340 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100341 self.assertEqual(
342 ssl._ssl._test_decode_cert(CERTFILE),
343 CERTFILE_INFO
344 )
345 self.assertEqual(
346 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
347 SIGNED_CERTFILE_INFO
348 )
349
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200350 # Issue #13034: the subjectAltName in some certificates
351 # (notably projects.developer.nokia.com:443) wasn't parsed
352 p = ssl._ssl._test_decode_cert(NOKIACERT)
353 if support.verbose:
354 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
355 self.assertEqual(p['subjectAltName'],
356 (('DNS', 'projects.developer.nokia.com'),
357 ('DNS', 'projects.forum.nokia.com'))
358 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100359 # extra OCSP and AIA fields
360 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
361 self.assertEqual(p['caIssuers'],
362 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
363 self.assertEqual(p['crlDistributionPoints'],
364 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000365
Christian Heimes824f7f32013-08-17 00:54:47 +0200366 def test_parse_cert_CVE_2013_4238(self):
367 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
368 if support.verbose:
369 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
370 subject = ((('countryName', 'US'),),
371 (('stateOrProvinceName', 'Oregon'),),
372 (('localityName', 'Beaverton'),),
373 (('organizationName', 'Python Software Foundation'),),
374 (('organizationalUnitName', 'Python Core Development'),),
375 (('commonName', 'null.python.org\x00example.org'),),
376 (('emailAddress', 'python-dev@python.org'),))
377 self.assertEqual(p['subject'], subject)
378 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200379 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
380 san = (('DNS', 'altnull.python.org\x00example.com'),
381 ('email', 'null@python.org\x00user@example.org'),
382 ('URI', 'http://null.python.org\x00http://example.org'),
383 ('IP Address', '192.0.2.1'),
384 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
385 else:
386 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
387 san = (('DNS', 'altnull.python.org\x00example.com'),
388 ('email', 'null@python.org\x00user@example.org'),
389 ('URI', 'http://null.python.org\x00http://example.org'),
390 ('IP Address', '192.0.2.1'),
391 ('IP Address', '<invalid>'))
392
393 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200394
Christian Heimes1c03abd2016-09-06 23:25:35 +0200395 def test_parse_all_sans(self):
396 p = ssl._ssl._test_decode_cert(ALLSANFILE)
397 self.assertEqual(p['subjectAltName'],
398 (
399 ('DNS', 'allsans'),
400 ('othername', '<unsupported>'),
401 ('othername', '<unsupported>'),
402 ('email', 'user@example.org'),
403 ('DNS', 'www.example.org'),
404 ('DirName',
405 ((('countryName', 'XY'),),
406 (('localityName', 'Castle Anthrax'),),
407 (('organizationName', 'Python Software Foundation'),),
408 (('commonName', 'dirname example'),))),
409 ('URI', 'https://www.python.org/'),
410 ('IP Address', '127.0.0.1'),
411 ('IP Address', '0:0:0:0:0:0:0:1\n'),
412 ('Registered ID', '1.2.3.4.5')
413 )
414 )
415
Antoine Pitrou480a1242010-04-28 21:37:09 +0000416 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000417 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000418 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000419 d1 = ssl.PEM_cert_to_DER_cert(pem)
420 p2 = ssl.DER_cert_to_PEM_cert(d1)
421 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000422 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000423 if not p2.startswith(ssl.PEM_HEADER + '\n'):
424 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
425 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
426 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000427
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000428 def test_openssl_version(self):
429 n = ssl.OPENSSL_VERSION_NUMBER
430 t = ssl.OPENSSL_VERSION_INFO
431 s = ssl.OPENSSL_VERSION
432 self.assertIsInstance(n, int)
433 self.assertIsInstance(t, tuple)
434 self.assertIsInstance(s, str)
435 # Some sanity checks follow
436 # >= 0.9
437 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400438 # < 3.0
439 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000440 major, minor, fix, patch, status = t
441 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400442 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000443 self.assertGreaterEqual(minor, 0)
444 self.assertLess(minor, 256)
445 self.assertGreaterEqual(fix, 0)
446 self.assertLess(fix, 256)
447 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100448 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000449 self.assertGreaterEqual(status, 0)
450 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400451 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200452 if IS_LIBRESSL:
453 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100454 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400455 else:
456 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100457 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000458
Antoine Pitrou9d543662010-04-23 23:10:32 +0000459 @support.cpython_only
460 def test_refcycle(self):
461 # Issue #7943: an SSL object doesn't create reference cycles with
462 # itself.
463 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200464 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000465 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100466 with support.check_warnings(("", ResourceWarning)):
467 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100468 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000469
Antoine Pitroua468adc2010-09-14 14:43:44 +0000470 def test_wrapped_unconnected(self):
471 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200472 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000473 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200474 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100475 self.assertRaises(OSError, ss.recv, 1)
476 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
477 self.assertRaises(OSError, ss.recvfrom, 1)
478 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
479 self.assertRaises(OSError, ss.send, b'x')
480 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800481 self.assertRaises(NotImplementedError, ss.sendmsg,
482 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000483
Antoine Pitrou40f08742010-04-24 22:04:40 +0000484 def test_timeout(self):
485 # Issue #8524: when creating an SSL socket, the timeout of the
486 # original socket should be retained.
487 for timeout in (None, 0.0, 5.0):
488 s = socket.socket(socket.AF_INET)
489 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200490 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100491 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000492
Christian Heimesd0486372016-09-10 23:23:33 +0200493 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000494 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000495 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000496 "certfile must be specified",
497 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000498 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000499 "certfile must be specified for server-side operations",
500 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000501 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000502 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200503 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100504 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
505 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200506 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200507 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000508 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000509 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000510 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200511 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000512 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000513 ssl.wrap_socket(sock,
514 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000515 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200516 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000517 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000518 ssl.wrap_socket(sock,
519 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000520 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000521
Martin Panter3464ea22016-02-01 21:58:11 +0000522 def bad_cert_test(self, certfile):
523 """Check that trying to use the given client certificate fails"""
524 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
525 certfile)
526 sock = socket.socket()
527 self.addCleanup(sock.close)
528 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200529 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200530 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000531
532 def test_empty_cert(self):
533 """Wrapping with an empty cert file"""
534 self.bad_cert_test("nullcert.pem")
535
536 def test_malformed_cert(self):
537 """Wrapping with a badly formatted certificate (syntax error)"""
538 self.bad_cert_test("badcert.pem")
539
540 def test_malformed_key(self):
541 """Wrapping with a badly formatted key (syntax error)"""
542 self.bad_cert_test("badkey.pem")
543
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000544 def test_match_hostname(self):
545 def ok(cert, hostname):
546 ssl.match_hostname(cert, hostname)
547 def fail(cert, hostname):
548 self.assertRaises(ssl.CertificateError,
549 ssl.match_hostname, cert, hostname)
550
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100551 # -- Hostname matching --
552
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000553 cert = {'subject': ((('commonName', 'example.com'),),)}
554 ok(cert, 'example.com')
555 ok(cert, 'ExAmple.cOm')
556 fail(cert, 'www.example.com')
557 fail(cert, '.example.com')
558 fail(cert, 'example.org')
559 fail(cert, 'exampleXcom')
560
561 cert = {'subject': ((('commonName', '*.a.com'),),)}
562 ok(cert, 'foo.a.com')
563 fail(cert, 'bar.foo.a.com')
564 fail(cert, 'a.com')
565 fail(cert, 'Xa.com')
566 fail(cert, '.a.com')
567
Mandeep Singhede2ac92017-11-27 04:01:27 +0530568 # only match wildcards when they are the only thing
569 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000570 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530571 fail(cert, 'foo.com')
572 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000573 fail(cert, 'bar.com')
574 fail(cert, 'foo.a.com')
575 fail(cert, 'bar.foo.com')
576
Christian Heimes824f7f32013-08-17 00:54:47 +0200577 # NULL bytes are bad, CVE-2013-4073
578 cert = {'subject': ((('commonName',
579 'null.python.org\x00example.org'),),)}
580 ok(cert, 'null.python.org\x00example.org') # or raise an error?
581 fail(cert, 'example.org')
582 fail(cert, 'null.python.org')
583
Georg Brandl72c98d32013-10-27 07:16:53 +0100584 # error cases with wildcards
585 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
586 fail(cert, 'bar.foo.a.com')
587 fail(cert, 'a.com')
588 fail(cert, 'Xa.com')
589 fail(cert, '.a.com')
590
591 cert = {'subject': ((('commonName', 'a.*.com'),),)}
592 fail(cert, 'a.foo.com')
593 fail(cert, 'a..com')
594 fail(cert, 'a.com')
595
596 # wildcard doesn't match IDNA prefix 'xn--'
597 idna = 'püthon.python.org'.encode("idna").decode("ascii")
598 cert = {'subject': ((('commonName', idna),),)}
599 ok(cert, idna)
600 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
601 fail(cert, idna)
602 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
603 fail(cert, idna)
604
605 # wildcard in first fragment and IDNA A-labels in sequent fragments
606 # are supported.
607 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
608 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530609 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
610 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100611 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
612 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
613
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000614 # Slightly fake real-world example
615 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
616 'subject': ((('commonName', 'linuxfrz.org'),),),
617 'subjectAltName': (('DNS', 'linuxfr.org'),
618 ('DNS', 'linuxfr.com'),
619 ('othername', '<unsupported>'))}
620 ok(cert, 'linuxfr.org')
621 ok(cert, 'linuxfr.com')
622 # Not a "DNS" entry
623 fail(cert, '<unsupported>')
624 # When there is a subjectAltName, commonName isn't used
625 fail(cert, 'linuxfrz.org')
626
627 # A pristine real-world example
628 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
629 'subject': ((('countryName', 'US'),),
630 (('stateOrProvinceName', 'California'),),
631 (('localityName', 'Mountain View'),),
632 (('organizationName', 'Google Inc'),),
633 (('commonName', 'mail.google.com'),))}
634 ok(cert, 'mail.google.com')
635 fail(cert, 'gmail.com')
636 # Only commonName is considered
637 fail(cert, 'California')
638
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100639 # -- IPv4 matching --
640 cert = {'subject': ((('commonName', 'example.com'),),),
641 'subjectAltName': (('DNS', 'example.com'),
642 ('IP Address', '10.11.12.13'),
643 ('IP Address', '14.15.16.17'))}
644 ok(cert, '10.11.12.13')
645 ok(cert, '14.15.16.17')
646 fail(cert, '14.15.16.18')
647 fail(cert, 'example.net')
648
649 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800650 if hasattr(socket, 'AF_INET6'):
651 cert = {'subject': ((('commonName', 'example.com'),),),
652 'subjectAltName': (
653 ('DNS', 'example.com'),
654 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
655 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
656 ok(cert, '2001::cafe')
657 ok(cert, '2003::baba')
658 fail(cert, '2003::bebe')
659 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100660
661 # -- Miscellaneous --
662
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000663 # Neither commonName nor subjectAltName
664 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
665 'subject': ((('countryName', 'US'),),
666 (('stateOrProvinceName', 'California'),),
667 (('localityName', 'Mountain View'),),
668 (('organizationName', 'Google Inc'),))}
669 fail(cert, 'mail.google.com')
670
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200671 # No DNS entry in subjectAltName but a commonName
672 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
673 'subject': ((('countryName', 'US'),),
674 (('stateOrProvinceName', 'California'),),
675 (('localityName', 'Mountain View'),),
676 (('commonName', 'mail.google.com'),)),
677 'subjectAltName': (('othername', 'blabla'), )}
678 ok(cert, 'mail.google.com')
679
680 # No DNS entry subjectAltName and no commonName
681 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
682 'subject': ((('countryName', 'US'),),
683 (('stateOrProvinceName', 'California'),),
684 (('localityName', 'Mountain View'),),
685 (('organizationName', 'Google Inc'),)),
686 'subjectAltName': (('othername', 'blabla'),)}
687 fail(cert, 'google.com')
688
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000689 # Empty cert / no cert
690 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
691 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
692
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200693 # Issue #17980: avoid denials of service by refusing more than one
694 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800695 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
696 with self.assertRaisesRegex(
697 ssl.CertificateError,
698 "partial wildcards in leftmost label are not supported"):
699 ssl.match_hostname(cert, 'axxb.example.com')
700
701 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
702 with self.assertRaisesRegex(
703 ssl.CertificateError,
704 "wildcard can only be present in the leftmost label"):
705 ssl.match_hostname(cert, 'www.sub.example.com')
706
707 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
708 with self.assertRaisesRegex(
709 ssl.CertificateError,
710 "too many wildcards"):
711 ssl.match_hostname(cert, 'axxbxxc.example.com')
712
713 cert = {'subject': ((('commonName', '*'),),)}
714 with self.assertRaisesRegex(
715 ssl.CertificateError,
716 "sole wildcard without additional labels are not support"):
717 ssl.match_hostname(cert, 'host')
718
719 cert = {'subject': ((('commonName', '*.com'),),)}
720 with self.assertRaisesRegex(
721 ssl.CertificateError,
722 r"hostname 'com' doesn't match '\*.com'"):
723 ssl.match_hostname(cert, 'com')
724
725 # extra checks for _inet_paton()
726 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
727 with self.assertRaises(ValueError):
728 ssl._inet_paton(invalid)
729 for ipaddr in ['127.0.0.1', '192.168.0.1']:
730 self.assertTrue(ssl._inet_paton(ipaddr))
731 if hasattr(socket, 'AF_INET6'):
732 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
733 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200734
Antoine Pitroud5323212010-10-22 18:19:07 +0000735 def test_server_side(self):
736 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200737 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000738 with socket.socket() as sock:
739 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
740 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000741
Antoine Pitroud6494802011-07-21 01:11:30 +0200742 def test_unknown_channel_binding(self):
743 # should raise ValueError for unknown type
744 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200745 s.bind(('127.0.0.1', 0))
746 s.listen()
747 c = socket.socket(socket.AF_INET)
748 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200749 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100750 with self.assertRaises(ValueError):
751 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200752 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200753
754 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
755 "'tls-unique' channel binding not available")
756 def test_tls_unique_channel_binding(self):
757 # unconnected should return None for known type
758 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200759 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100760 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200761 # the same for server-side
762 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200763 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100764 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200765
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600766 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200767 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600768 r = repr(ss)
769 with self.assertWarns(ResourceWarning) as cm:
770 ss = None
771 support.gc_collect()
772 self.assertIn(r, str(cm.warning.args[0]))
773
Christian Heimes6d7ad132013-06-09 18:02:55 +0200774 def test_get_default_verify_paths(self):
775 paths = ssl.get_default_verify_paths()
776 self.assertEqual(len(paths), 6)
777 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
778
779 with support.EnvironmentVarGuard() as env:
780 env["SSL_CERT_DIR"] = CAPATH
781 env["SSL_CERT_FILE"] = CERTFILE
782 paths = ssl.get_default_verify_paths()
783 self.assertEqual(paths.cafile, CERTFILE)
784 self.assertEqual(paths.capath, CAPATH)
785
Christian Heimes44109d72013-11-22 01:51:30 +0100786 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
787 def test_enum_certificates(self):
788 self.assertTrue(ssl.enum_certificates("CA"))
789 self.assertTrue(ssl.enum_certificates("ROOT"))
790
791 self.assertRaises(TypeError, ssl.enum_certificates)
792 self.assertRaises(WindowsError, ssl.enum_certificates, "")
793
Christian Heimesc2d65e12013-11-22 16:13:55 +0100794 trust_oids = set()
795 for storename in ("CA", "ROOT"):
796 store = ssl.enum_certificates(storename)
797 self.assertIsInstance(store, list)
798 for element in store:
799 self.assertIsInstance(element, tuple)
800 self.assertEqual(len(element), 3)
801 cert, enc, trust = element
802 self.assertIsInstance(cert, bytes)
803 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
804 self.assertIsInstance(trust, (set, bool))
805 if isinstance(trust, set):
806 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100807
808 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100809 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200810
Christian Heimes46bebee2013-06-09 19:03:31 +0200811 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100812 def test_enum_crls(self):
813 self.assertTrue(ssl.enum_crls("CA"))
814 self.assertRaises(TypeError, ssl.enum_crls)
815 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200816
Christian Heimes44109d72013-11-22 01:51:30 +0100817 crls = ssl.enum_crls("CA")
818 self.assertIsInstance(crls, list)
819 for element in crls:
820 self.assertIsInstance(element, tuple)
821 self.assertEqual(len(element), 2)
822 self.assertIsInstance(element[0], bytes)
823 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200824
Christian Heimes46bebee2013-06-09 19:03:31 +0200825
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100826 def test_asn1object(self):
827 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
828 '1.3.6.1.5.5.7.3.1')
829
830 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
831 self.assertEqual(val, expected)
832 self.assertEqual(val.nid, 129)
833 self.assertEqual(val.shortname, 'serverAuth')
834 self.assertEqual(val.longname, 'TLS Web Server Authentication')
835 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
836 self.assertIsInstance(val, ssl._ASN1Object)
837 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
838
839 val = ssl._ASN1Object.fromnid(129)
840 self.assertEqual(val, expected)
841 self.assertIsInstance(val, ssl._ASN1Object)
842 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100843 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
844 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100845 for i in range(1000):
846 try:
847 obj = ssl._ASN1Object.fromnid(i)
848 except ValueError:
849 pass
850 else:
851 self.assertIsInstance(obj.nid, int)
852 self.assertIsInstance(obj.shortname, str)
853 self.assertIsInstance(obj.longname, str)
854 self.assertIsInstance(obj.oid, (str, type(None)))
855
856 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
857 self.assertEqual(val, expected)
858 self.assertIsInstance(val, ssl._ASN1Object)
859 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
860 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
861 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100862 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
863 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100864
Christian Heimes72d28502013-11-23 13:56:58 +0100865 def test_purpose_enum(self):
866 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
867 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
868 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
869 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
872 '1.3.6.1.5.5.7.3.1')
873
874 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
875 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
876 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
877 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
880 '1.3.6.1.5.5.7.3.2')
881
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100882 def test_unsupported_dtls(self):
883 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
884 self.addCleanup(s.close)
885 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200886 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100887 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200888 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100889 with self.assertRaises(NotImplementedError) as cx:
890 ctx.wrap_socket(s)
891 self.assertEqual(str(cx.exception), "only stream sockets are supported")
892
Antoine Pitrouc695c952014-04-28 20:57:36 +0200893 def cert_time_ok(self, timestring, timestamp):
894 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
895
896 def cert_time_fail(self, timestring):
897 with self.assertRaises(ValueError):
898 ssl.cert_time_to_seconds(timestring)
899
900 @unittest.skipUnless(utc_offset(),
901 'local time needs to be different from UTC')
902 def test_cert_time_to_seconds_timezone(self):
903 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
904 # results if local timezone is not UTC
905 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
906 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
907
908 def test_cert_time_to_seconds(self):
909 timestring = "Jan 5 09:34:43 2018 GMT"
910 ts = 1515144883.0
911 self.cert_time_ok(timestring, ts)
912 # accept keyword parameter, assert its name
913 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
914 # accept both %e and %d (space or zero generated by strftime)
915 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
916 # case-insensitive
917 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
918 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
919 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
920 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
921 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
922 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
923 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
924 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
925
926 newyear_ts = 1230768000.0
927 # leap seconds
928 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
929 # same timestamp
930 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
931
932 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
933 # allow 60th second (even if it is not a leap second)
934 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
935 # allow 2nd leap second for compatibility with time.strptime()
936 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
937 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
938
Mike53f7a7c2017-12-14 14:04:53 +0300939 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200940 # 99991231235959Z (rfc 5280)
941 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
942
943 @support.run_with_locale('LC_ALL', '')
944 def test_cert_time_to_seconds_locale(self):
945 # `cert_time_to_seconds()` should be locale independent
946
947 def local_february_name():
948 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
949
950 if local_february_name().lower() == 'feb':
951 self.skipTest("locale-specific month name needs to be "
952 "different from C locale")
953
954 # locale-independent
955 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
956 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
957
Martin Panter3840b2a2016-03-27 01:53:46 +0000958 def test_connect_ex_error(self):
959 server = socket.socket(socket.AF_INET)
960 self.addCleanup(server.close)
961 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200962 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000963 cert_reqs=ssl.CERT_REQUIRED)
964 self.addCleanup(s.close)
965 rc = s.connect_ex((HOST, port))
966 # Issue #19919: Windows machines or VMs hosted on Windows
967 # machines sometimes return EWOULDBLOCK.
968 errors = (
969 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
970 errno.EWOULDBLOCK,
971 )
972 self.assertIn(rc, errors)
973
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100974
Antoine Pitrou152efa22010-05-16 18:19:27 +0000975class ContextTests(unittest.TestCase):
976
Antoine Pitrou23df4832010-08-04 17:14:06 +0000977 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000978 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100979 for protocol in PROTOCOLS:
980 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200981 ctx = ssl.SSLContext()
982 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000983 self.assertRaises(ValueError, ssl.SSLContext, -1)
984 self.assertRaises(ValueError, ssl.SSLContext, 42)
985
Antoine Pitrou23df4832010-08-04 17:14:06 +0000986 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000987 def test_protocol(self):
988 for proto in PROTOCOLS:
989 ctx = ssl.SSLContext(proto)
990 self.assertEqual(ctx.protocol, proto)
991
992 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200993 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000994 ctx.set_ciphers("ALL")
995 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000996 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000997 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000998
Christian Heimes892d66e2018-01-29 14:10:18 +0100999 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1000 "Test applies only to Python default ciphers")
1001 def test_python_ciphers(self):
1002 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1003 ciphers = ctx.get_ciphers()
1004 for suite in ciphers:
1005 name = suite['name']
1006 self.assertNotIn("PSK", name)
1007 self.assertNotIn("SRP", name)
1008 self.assertNotIn("MD5", name)
1009 self.assertNotIn("RC4", name)
1010 self.assertNotIn("3DES", name)
1011
Christian Heimes25bfcd52016-09-06 00:04:45 +02001012 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1013 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001014 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001015 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001016 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001017 self.assertIn('AES256-GCM-SHA384', names)
1018 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001019
Antoine Pitrou23df4832010-08-04 17:14:06 +00001020 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001021 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001022 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001023 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001024 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001025 # SSLContext also enables these by default
1026 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001027 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1028 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001029 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001030 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001031 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001032 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001033 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1034 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001035 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001036 # Ubuntu has OP_NO_SSLv3 forced on by default
1037 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001038 else:
1039 with self.assertRaises(ValueError):
1040 ctx.options = 0
1041
Christian Heimesa170fa12017-09-15 20:27:30 +02001042 def test_verify_mode_protocol(self):
1043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001044 # Default value
1045 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1046 ctx.verify_mode = ssl.CERT_OPTIONAL
1047 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1048 ctx.verify_mode = ssl.CERT_REQUIRED
1049 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1050 ctx.verify_mode = ssl.CERT_NONE
1051 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1052 with self.assertRaises(TypeError):
1053 ctx.verify_mode = None
1054 with self.assertRaises(ValueError):
1055 ctx.verify_mode = 42
1056
Christian Heimesa170fa12017-09-15 20:27:30 +02001057 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1058 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1059 self.assertFalse(ctx.check_hostname)
1060
1061 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1062 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1063 self.assertTrue(ctx.check_hostname)
1064
Christian Heimes61d478c2018-01-27 15:51:38 +01001065 def test_hostname_checks_common_name(self):
1066 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1067 self.assertTrue(ctx.hostname_checks_common_name)
1068 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1069 ctx.hostname_checks_common_name = True
1070 self.assertTrue(ctx.hostname_checks_common_name)
1071 ctx.hostname_checks_common_name = False
1072 self.assertFalse(ctx.hostname_checks_common_name)
1073 ctx.hostname_checks_common_name = True
1074 self.assertTrue(ctx.hostname_checks_common_name)
1075 else:
1076 with self.assertRaises(AttributeError):
1077 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001078
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001079 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1080 "required OpenSSL 1.1.0g")
1081 def test_min_max_version(self):
1082 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1083 self.assertEqual(
1084 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1085 )
1086 self.assertEqual(
1087 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1088 )
1089
1090 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1091 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1092 self.assertEqual(
1093 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1094 )
1095 self.assertEqual(
1096 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1097 )
1098
1099 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1100 ctx.maximum_version = ssl.TLSVersion.TLSv1
1101 self.assertEqual(
1102 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1103 )
1104 self.assertEqual(
1105 ctx.maximum_version, ssl.TLSVersion.TLSv1
1106 )
1107
1108 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1109 self.assertEqual(
1110 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1111 )
1112
1113 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1114 self.assertIn(
1115 ctx.maximum_version,
1116 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1117 )
1118
1119 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1120 self.assertIn(
1121 ctx.minimum_version,
1122 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1123 )
1124
1125 with self.assertRaises(ValueError):
1126 ctx.minimum_version = 42
1127
1128 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1129
1130 self.assertEqual(
1131 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1132 )
1133 self.assertEqual(
1134 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1135 )
1136 with self.assertRaises(ValueError):
1137 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1138 with self.assertRaises(ValueError):
1139 ctx.maximum_version = ssl.TLSVersion.TLSv1
1140
1141
Christian Heimes2427b502013-11-23 11:24:32 +01001142 @unittest.skipUnless(have_verify_flags(),
1143 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001144 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001145 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001146 # default value
1147 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1148 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001149 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1150 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1151 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1152 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1153 ctx.verify_flags = ssl.VERIFY_DEFAULT
1154 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1155 # supports any value
1156 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1157 self.assertEqual(ctx.verify_flags,
1158 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1159 with self.assertRaises(TypeError):
1160 ctx.verify_flags = None
1161
Antoine Pitrou152efa22010-05-16 18:19:27 +00001162 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001163 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001164 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001165 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001166 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1167 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001168 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001169 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001170 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001171 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001172 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001173 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001174 ctx.load_cert_chain(EMPTYCERT)
1175 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001177 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1178 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1179 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001180 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001181 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001182 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001183 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001184 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001185 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1186 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001187 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001188 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001189 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001190 # Password protected key and cert
1191 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1192 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1193 ctx.load_cert_chain(CERTFILE_PROTECTED,
1194 password=bytearray(KEY_PASSWORD.encode()))
1195 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1196 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1197 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1198 bytearray(KEY_PASSWORD.encode()))
1199 with self.assertRaisesRegex(TypeError, "should be a string"):
1200 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1201 with self.assertRaises(ssl.SSLError):
1202 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1203 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1204 # openssl has a fixed limit on the password buffer.
1205 # PEM_BUFSIZE is generally set to 1kb.
1206 # Return a string larger than this.
1207 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1208 # Password callback
1209 def getpass_unicode():
1210 return KEY_PASSWORD
1211 def getpass_bytes():
1212 return KEY_PASSWORD.encode()
1213 def getpass_bytearray():
1214 return bytearray(KEY_PASSWORD.encode())
1215 def getpass_badpass():
1216 return "badpass"
1217 def getpass_huge():
1218 return b'a' * (1024 * 1024)
1219 def getpass_bad_type():
1220 return 9
1221 def getpass_exception():
1222 raise Exception('getpass error')
1223 class GetPassCallable:
1224 def __call__(self):
1225 return KEY_PASSWORD
1226 def getpass(self):
1227 return KEY_PASSWORD
1228 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1229 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1232 ctx.load_cert_chain(CERTFILE_PROTECTED,
1233 password=GetPassCallable().getpass)
1234 with self.assertRaises(ssl.SSLError):
1235 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1236 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1237 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1238 with self.assertRaisesRegex(TypeError, "must return a string"):
1239 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1240 with self.assertRaisesRegex(Exception, "getpass error"):
1241 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1242 # Make sure the password function isn't called if it isn't needed
1243 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001244
1245 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001246 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001247 ctx.load_verify_locations(CERTFILE)
1248 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1249 ctx.load_verify_locations(BYTES_CERTFILE)
1250 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1251 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001252 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001253 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001254 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001255 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001256 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001257 ctx.load_verify_locations(BADCERT)
1258 ctx.load_verify_locations(CERTFILE, CAPATH)
1259 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1260
Victor Stinner80f75e62011-01-29 11:31:20 +00001261 # Issue #10989: crash if the second argument type is invalid
1262 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1263
Christian Heimesefff7062013-11-21 03:35:02 +01001264 def test_load_verify_cadata(self):
1265 # test cadata
1266 with open(CAFILE_CACERT) as f:
1267 cacert_pem = f.read()
1268 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1269 with open(CAFILE_NEURONIO) as f:
1270 neuronio_pem = f.read()
1271 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1272
1273 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001275 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1276 ctx.load_verify_locations(cadata=cacert_pem)
1277 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1278 ctx.load_verify_locations(cadata=neuronio_pem)
1279 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1280 # cert already in hash table
1281 ctx.load_verify_locations(cadata=neuronio_pem)
1282 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1283
1284 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001285 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001286 combined = "\n".join((cacert_pem, neuronio_pem))
1287 ctx.load_verify_locations(cadata=combined)
1288 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1289
1290 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001291 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001292 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1293 neuronio_pem, "tail"]
1294 ctx.load_verify_locations(cadata="\n".join(combined))
1295 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1296
1297 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001298 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001299 ctx.load_verify_locations(cadata=cacert_der)
1300 ctx.load_verify_locations(cadata=neuronio_der)
1301 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1302 # cert already in hash table
1303 ctx.load_verify_locations(cadata=cacert_der)
1304 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1305
1306 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001307 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001308 combined = b"".join((cacert_der, neuronio_der))
1309 ctx.load_verify_locations(cadata=combined)
1310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1311
1312 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001313 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001314 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1315
1316 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1317 ctx.load_verify_locations(cadata="broken")
1318 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1319 ctx.load_verify_locations(cadata=b"broken")
1320
1321
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001322 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001323 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001324 ctx.load_dh_params(DHFILE)
1325 if os.name != 'nt':
1326 ctx.load_dh_params(BYTES_DHFILE)
1327 self.assertRaises(TypeError, ctx.load_dh_params)
1328 self.assertRaises(TypeError, ctx.load_dh_params, None)
1329 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001330 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001331 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001332 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001333 ctx.load_dh_params(CERTFILE)
1334
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001335 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001336 def test_session_stats(self):
1337 for proto in PROTOCOLS:
1338 ctx = ssl.SSLContext(proto)
1339 self.assertEqual(ctx.session_stats(), {
1340 'number': 0,
1341 'connect': 0,
1342 'connect_good': 0,
1343 'connect_renegotiate': 0,
1344 'accept': 0,
1345 'accept_good': 0,
1346 'accept_renegotiate': 0,
1347 'hits': 0,
1348 'misses': 0,
1349 'timeouts': 0,
1350 'cache_full': 0,
1351 })
1352
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001353 def test_set_default_verify_paths(self):
1354 # There's not much we can do to test that it acts as expected,
1355 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001356 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001357 ctx.set_default_verify_paths()
1358
Antoine Pitrou501da612011-12-21 09:27:41 +01001359 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001360 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001361 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001362 ctx.set_ecdh_curve("prime256v1")
1363 ctx.set_ecdh_curve(b"prime256v1")
1364 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1365 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1366 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1367 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1368
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001369 @needs_sni
1370 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001371 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001372
1373 # set_servername_callback expects a callable, or None
1374 self.assertRaises(TypeError, ctx.set_servername_callback)
1375 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1376 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1377 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1378
1379 def dummycallback(sock, servername, ctx):
1380 pass
1381 ctx.set_servername_callback(None)
1382 ctx.set_servername_callback(dummycallback)
1383
1384 @needs_sni
1385 def test_sni_callback_refcycle(self):
1386 # Reference cycles through the servername callback are detected
1387 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001388 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001389 def dummycallback(sock, servername, ctx, cycle=ctx):
1390 pass
1391 ctx.set_servername_callback(dummycallback)
1392 wr = weakref.ref(ctx)
1393 del ctx, dummycallback
1394 gc.collect()
1395 self.assertIs(wr(), None)
1396
Christian Heimes9a5395a2013-06-17 15:44:12 +02001397 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001398 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001399 self.assertEqual(ctx.cert_store_stats(),
1400 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1401 ctx.load_cert_chain(CERTFILE)
1402 self.assertEqual(ctx.cert_store_stats(),
1403 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1404 ctx.load_verify_locations(CERTFILE)
1405 self.assertEqual(ctx.cert_store_stats(),
1406 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001407 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001408 self.assertEqual(ctx.cert_store_stats(),
1409 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1410
1411 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001412 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001413 self.assertEqual(ctx.get_ca_certs(), [])
1414 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1415 ctx.load_verify_locations(CERTFILE)
1416 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001417 # but CAFILE_CACERT is a CA cert
1418 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001419 self.assertEqual(ctx.get_ca_certs(),
1420 [{'issuer': ((('organizationName', 'Root CA'),),
1421 (('organizationalUnitName', 'http://www.cacert.org'),),
1422 (('commonName', 'CA Cert Signing Authority'),),
1423 (('emailAddress', 'support@cacert.org'),)),
1424 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1425 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1426 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001427 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001428 'subject': ((('organizationName', 'Root CA'),),
1429 (('organizationalUnitName', 'http://www.cacert.org'),),
1430 (('commonName', 'CA Cert Signing Authority'),),
1431 (('emailAddress', 'support@cacert.org'),)),
1432 'version': 3}])
1433
Martin Panterb55f8b72016-01-14 12:53:56 +00001434 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001435 pem = f.read()
1436 der = ssl.PEM_cert_to_DER_cert(pem)
1437 self.assertEqual(ctx.get_ca_certs(True), [der])
1438
Christian Heimes72d28502013-11-23 13:56:58 +01001439 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001440 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001441 ctx.load_default_certs()
1442
Christian Heimesa170fa12017-09-15 20:27:30 +02001443 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001444 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1445 ctx.load_default_certs()
1446
Christian Heimesa170fa12017-09-15 20:27:30 +02001447 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001448 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1449
Christian Heimesa170fa12017-09-15 20:27:30 +02001450 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001451 self.assertRaises(TypeError, ctx.load_default_certs, None)
1452 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1453
Benjamin Peterson91244e02014-10-03 18:17:15 -04001454 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001455 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001456 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001457 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001458 with support.EnvironmentVarGuard() as env:
1459 env["SSL_CERT_DIR"] = CAPATH
1460 env["SSL_CERT_FILE"] = CERTFILE
1461 ctx.load_default_certs()
1462 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1463
Benjamin Peterson91244e02014-10-03 18:17:15 -04001464 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001465 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001466 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001467 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001468 ctx.load_default_certs()
1469 stats = ctx.cert_store_stats()
1470
Christian Heimesa170fa12017-09-15 20:27:30 +02001471 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001472 with support.EnvironmentVarGuard() as env:
1473 env["SSL_CERT_DIR"] = CAPATH
1474 env["SSL_CERT_FILE"] = CERTFILE
1475 ctx.load_default_certs()
1476 stats["x509"] += 1
1477 self.assertEqual(ctx.cert_store_stats(), stats)
1478
Christian Heimes358cfd42016-09-10 22:43:48 +02001479 def _assert_context_options(self, ctx):
1480 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1481 if OP_NO_COMPRESSION != 0:
1482 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1483 OP_NO_COMPRESSION)
1484 if OP_SINGLE_DH_USE != 0:
1485 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1486 OP_SINGLE_DH_USE)
1487 if OP_SINGLE_ECDH_USE != 0:
1488 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1489 OP_SINGLE_ECDH_USE)
1490 if OP_CIPHER_SERVER_PREFERENCE != 0:
1491 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1492 OP_CIPHER_SERVER_PREFERENCE)
1493
Christian Heimes4c05b472013-11-23 15:58:30 +01001494 def test_create_default_context(self):
1495 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001496
Christian Heimesa170fa12017-09-15 20:27:30 +02001497 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001498 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001499 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001500 self._assert_context_options(ctx)
1501
Christian Heimes4c05b472013-11-23 15:58:30 +01001502 with open(SIGNING_CA) as f:
1503 cadata = f.read()
1504 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1505 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001506 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001507 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001508 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001509
1510 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001511 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001512 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001513 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001514
Christian Heimes67986f92013-11-23 22:43:47 +01001515 def test__create_stdlib_context(self):
1516 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001517 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001518 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001519 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001520 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001521
1522 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1523 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1524 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001525 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001526
1527 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001528 cert_reqs=ssl.CERT_REQUIRED,
1529 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001530 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1531 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001532 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001533 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001534
1535 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001536 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001537 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001538 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001539
Christian Heimes1aa9a752013-12-02 02:41:19 +01001540 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001541 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001542 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001543 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001544
Christian Heimese82c0342017-09-15 20:29:57 +02001545 # Auto set CERT_REQUIRED
1546 ctx.check_hostname = True
1547 self.assertTrue(ctx.check_hostname)
1548 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1549 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001550 ctx.verify_mode = ssl.CERT_REQUIRED
1551 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001552 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001553
Christian Heimese82c0342017-09-15 20:29:57 +02001554 # Changing verify_mode does not affect check_hostname
1555 ctx.check_hostname = False
1556 ctx.verify_mode = ssl.CERT_NONE
1557 ctx.check_hostname = False
1558 self.assertFalse(ctx.check_hostname)
1559 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1560 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001561 ctx.check_hostname = True
1562 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001563 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1564
1565 ctx.check_hostname = False
1566 ctx.verify_mode = ssl.CERT_OPTIONAL
1567 ctx.check_hostname = False
1568 self.assertFalse(ctx.check_hostname)
1569 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1570 # keep CERT_OPTIONAL
1571 ctx.check_hostname = True
1572 self.assertTrue(ctx.check_hostname)
1573 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001574
1575 # Cannot set CERT_NONE with check_hostname enabled
1576 with self.assertRaises(ValueError):
1577 ctx.verify_mode = ssl.CERT_NONE
1578 ctx.check_hostname = False
1579 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001580 ctx.verify_mode = ssl.CERT_NONE
1581 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001582
Christian Heimes5fe668c2016-09-12 00:01:11 +02001583 def test_context_client_server(self):
1584 # PROTOCOL_TLS_CLIENT has sane defaults
1585 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1586 self.assertTrue(ctx.check_hostname)
1587 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1588
1589 # PROTOCOL_TLS_SERVER has different but also sane defaults
1590 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1591 self.assertFalse(ctx.check_hostname)
1592 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1593
Christian Heimes4df60f12017-09-15 20:26:05 +02001594 def test_context_custom_class(self):
1595 class MySSLSocket(ssl.SSLSocket):
1596 pass
1597
1598 class MySSLObject(ssl.SSLObject):
1599 pass
1600
1601 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1602 ctx.sslsocket_class = MySSLSocket
1603 ctx.sslobject_class = MySSLObject
1604
1605 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1606 self.assertIsInstance(sock, MySSLSocket)
1607 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1608 self.assertIsInstance(obj, MySSLObject)
1609
Antoine Pitrou152efa22010-05-16 18:19:27 +00001610
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001611class SSLErrorTests(unittest.TestCase):
1612
1613 def test_str(self):
1614 # The str() of a SSLError doesn't include the errno
1615 e = ssl.SSLError(1, "foo")
1616 self.assertEqual(str(e), "foo")
1617 self.assertEqual(e.errno, 1)
1618 # Same for a subclass
1619 e = ssl.SSLZeroReturnError(1, "foo")
1620 self.assertEqual(str(e), "foo")
1621 self.assertEqual(e.errno, 1)
1622
1623 def test_lib_reason(self):
1624 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001625 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001626 with self.assertRaises(ssl.SSLError) as cm:
1627 ctx.load_dh_params(CERTFILE)
1628 self.assertEqual(cm.exception.library, 'PEM')
1629 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1630 s = str(cm.exception)
1631 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1632
1633 def test_subclass(self):
1634 # Check that the appropriate SSLError subclass is raised
1635 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001636 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1637 ctx.check_hostname = False
1638 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001639 with socket.socket() as s:
1640 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001641 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001642 c = socket.socket()
1643 c.connect(s.getsockname())
1644 c.setblocking(False)
1645 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001646 with self.assertRaises(ssl.SSLWantReadError) as cm:
1647 c.do_handshake()
1648 s = str(cm.exception)
1649 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1650 # For compatibility
1651 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1652
1653
Christian Heimes61d478c2018-01-27 15:51:38 +01001654 def test_bad_server_hostname(self):
1655 ctx = ssl.create_default_context()
1656 with self.assertRaises(ValueError):
1657 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1658 server_hostname="")
1659 with self.assertRaises(ValueError):
1660 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1661 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001662 with self.assertRaises(TypeError):
1663 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1664 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001665
1666
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001667class MemoryBIOTests(unittest.TestCase):
1668
1669 def test_read_write(self):
1670 bio = ssl.MemoryBIO()
1671 bio.write(b'foo')
1672 self.assertEqual(bio.read(), b'foo')
1673 self.assertEqual(bio.read(), b'')
1674 bio.write(b'foo')
1675 bio.write(b'bar')
1676 self.assertEqual(bio.read(), b'foobar')
1677 self.assertEqual(bio.read(), b'')
1678 bio.write(b'baz')
1679 self.assertEqual(bio.read(2), b'ba')
1680 self.assertEqual(bio.read(1), b'z')
1681 self.assertEqual(bio.read(1), b'')
1682
1683 def test_eof(self):
1684 bio = ssl.MemoryBIO()
1685 self.assertFalse(bio.eof)
1686 self.assertEqual(bio.read(), b'')
1687 self.assertFalse(bio.eof)
1688 bio.write(b'foo')
1689 self.assertFalse(bio.eof)
1690 bio.write_eof()
1691 self.assertFalse(bio.eof)
1692 self.assertEqual(bio.read(2), b'fo')
1693 self.assertFalse(bio.eof)
1694 self.assertEqual(bio.read(1), b'o')
1695 self.assertTrue(bio.eof)
1696 self.assertEqual(bio.read(), b'')
1697 self.assertTrue(bio.eof)
1698
1699 def test_pending(self):
1700 bio = ssl.MemoryBIO()
1701 self.assertEqual(bio.pending, 0)
1702 bio.write(b'foo')
1703 self.assertEqual(bio.pending, 3)
1704 for i in range(3):
1705 bio.read(1)
1706 self.assertEqual(bio.pending, 3-i-1)
1707 for i in range(3):
1708 bio.write(b'x')
1709 self.assertEqual(bio.pending, i+1)
1710 bio.read()
1711 self.assertEqual(bio.pending, 0)
1712
1713 def test_buffer_types(self):
1714 bio = ssl.MemoryBIO()
1715 bio.write(b'foo')
1716 self.assertEqual(bio.read(), b'foo')
1717 bio.write(bytearray(b'bar'))
1718 self.assertEqual(bio.read(), b'bar')
1719 bio.write(memoryview(b'baz'))
1720 self.assertEqual(bio.read(), b'baz')
1721
1722 def test_error_types(self):
1723 bio = ssl.MemoryBIO()
1724 self.assertRaises(TypeError, bio.write, 'foo')
1725 self.assertRaises(TypeError, bio.write, None)
1726 self.assertRaises(TypeError, bio.write, True)
1727 self.assertRaises(TypeError, bio.write, 1)
1728
1729
Christian Heimes89c20512018-02-27 11:17:32 +01001730class SSLObjectTests(unittest.TestCase):
1731 def test_private_init(self):
1732 bio = ssl.MemoryBIO()
1733 with self.assertRaisesRegex(TypeError, "public constructor"):
1734 ssl.SSLObject(bio, bio)
1735
1736
Martin Panter3840b2a2016-03-27 01:53:46 +00001737class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001738 """Tests that connect to a simple server running in the background"""
1739
1740 def setUp(self):
1741 server = ThreadedEchoServer(SIGNED_CERTFILE)
1742 self.server_addr = (HOST, server.port)
1743 server.__enter__()
1744 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001745
Antoine Pitrou480a1242010-04-28 21:37:09 +00001746 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001747 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001748 cert_reqs=ssl.CERT_NONE) as s:
1749 s.connect(self.server_addr)
1750 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001751 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001752
Martin Panter3840b2a2016-03-27 01:53:46 +00001753 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001754 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001755 cert_reqs=ssl.CERT_REQUIRED,
1756 ca_certs=SIGNING_CA) as s:
1757 s.connect(self.server_addr)
1758 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001759 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001760
Martin Panter3840b2a2016-03-27 01:53:46 +00001761 def test_connect_fail(self):
1762 # This should fail because we have no verification certs. Connection
1763 # failure crashes ThreadedEchoServer, so run this in an independent
1764 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001765 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001766 cert_reqs=ssl.CERT_REQUIRED)
1767 self.addCleanup(s.close)
1768 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1769 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001770
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001771 def test_connect_ex(self):
1772 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001773 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001774 cert_reqs=ssl.CERT_REQUIRED,
1775 ca_certs=SIGNING_CA)
1776 self.addCleanup(s.close)
1777 self.assertEqual(0, s.connect_ex(self.server_addr))
1778 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001779
1780 def test_non_blocking_connect_ex(self):
1781 # Issue #11326: non-blocking connect_ex() should allow handshake
1782 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001783 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001784 cert_reqs=ssl.CERT_REQUIRED,
1785 ca_certs=SIGNING_CA,
1786 do_handshake_on_connect=False)
1787 self.addCleanup(s.close)
1788 s.setblocking(False)
1789 rc = s.connect_ex(self.server_addr)
1790 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1791 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1792 # Wait for connect to finish
1793 select.select([], [s], [], 5.0)
1794 # Non-blocking handshake
1795 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001796 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001797 s.do_handshake()
1798 break
1799 except ssl.SSLWantReadError:
1800 select.select([s], [], [], 5.0)
1801 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001802 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001803 # SSL established
1804 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001805
Antoine Pitrou152efa22010-05-16 18:19:27 +00001806 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001807 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001808 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001809 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1810 s.connect(self.server_addr)
1811 self.assertEqual({}, s.getpeercert())
1812 # Same with a server hostname
1813 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1814 server_hostname="dummy") as s:
1815 s.connect(self.server_addr)
1816 ctx.verify_mode = ssl.CERT_REQUIRED
1817 # This should succeed because we specify the root cert
1818 ctx.load_verify_locations(SIGNING_CA)
1819 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1820 s.connect(self.server_addr)
1821 cert = s.getpeercert()
1822 self.assertTrue(cert)
1823
1824 def test_connect_with_context_fail(self):
1825 # This should fail because we have no verification certs. Connection
1826 # failure crashes ThreadedEchoServer, so run this in an independent
1827 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001828 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 ctx.verify_mode = ssl.CERT_REQUIRED
1830 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1831 self.addCleanup(s.close)
1832 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1833 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001834
1835 def test_connect_capath(self):
1836 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001837 # NOTE: the subject hashing algorithm has been changed between
1838 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1839 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001840 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001841 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001842 ctx.verify_mode = ssl.CERT_REQUIRED
1843 ctx.load_verify_locations(capath=CAPATH)
1844 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1845 s.connect(self.server_addr)
1846 cert = s.getpeercert()
1847 self.assertTrue(cert)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07001848
Martin Panter3840b2a2016-03-27 01:53:46 +00001849 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001850 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001851 ctx.verify_mode = ssl.CERT_REQUIRED
1852 ctx.load_verify_locations(capath=BYTES_CAPATH)
1853 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1854 s.connect(self.server_addr)
1855 cert = s.getpeercert()
1856 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001857
Christian Heimesefff7062013-11-21 03:35:02 +01001858 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001859 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001860 pem = f.read()
1861 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001862 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001863 ctx.verify_mode = ssl.CERT_REQUIRED
1864 ctx.load_verify_locations(cadata=pem)
1865 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1866 s.connect(self.server_addr)
1867 cert = s.getpeercert()
1868 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001869
Martin Panter3840b2a2016-03-27 01:53:46 +00001870 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001871 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 ctx.verify_mode = ssl.CERT_REQUIRED
1873 ctx.load_verify_locations(cadata=der)
1874 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1875 s.connect(self.server_addr)
1876 cert = s.getpeercert()
1877 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001878
Antoine Pitroue3220242010-04-24 11:13:53 +00001879 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1880 def test_makefile_close(self):
1881 # Issue #5238: creating a file-like object with makefile() shouldn't
1882 # delay closing the underlying "real socket" (here tested with its
1883 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001884 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001885 ss.connect(self.server_addr)
1886 fd = ss.fileno()
1887 f = ss.makefile()
1888 f.close()
1889 # The fd is still open
1890 os.read(fd, 0)
1891 # Closing the SSL socket should close the fd too
1892 ss.close()
1893 gc.collect()
1894 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001895 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001896 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001897
Antoine Pitrou480a1242010-04-28 21:37:09 +00001898 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001899 s = socket.socket(socket.AF_INET)
1900 s.connect(self.server_addr)
1901 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001902 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001903 cert_reqs=ssl.CERT_NONE,
1904 do_handshake_on_connect=False)
1905 self.addCleanup(s.close)
1906 count = 0
1907 while True:
1908 try:
1909 count += 1
1910 s.do_handshake()
1911 break
1912 except ssl.SSLWantReadError:
1913 select.select([s], [], [])
1914 except ssl.SSLWantWriteError:
1915 select.select([], [s], [])
1916 if support.verbose:
1917 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001918
Antoine Pitrou480a1242010-04-28 21:37:09 +00001919 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001920 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001921
Martin Panter3840b2a2016-03-27 01:53:46 +00001922 def test_get_server_certificate_fail(self):
1923 # Connection failure crashes ThreadedEchoServer, so run this in an
1924 # independent test method
1925 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001926
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001927 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001928 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001929 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1930 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001931 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001932 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1933 s.connect(self.server_addr)
1934 # Error checking can happen at instantiation or when connecting
1935 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1936 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001937 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001938 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1939 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001940
Christian Heimes9a5395a2013-06-17 15:44:12 +02001941 def test_get_ca_certs_capath(self):
1942 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001943 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001944 ctx.load_verify_locations(capath=CAPATH)
1945 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001946 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1947 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001948 s.connect(self.server_addr)
1949 cert = s.getpeercert()
1950 self.assertTrue(cert)
1951 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001952
Christian Heimes575596e2013-12-15 21:49:17 +01001953 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001954 def test_context_setget(self):
1955 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001956 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1957 ctx1.load_verify_locations(capath=CAPATH)
1958 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1959 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001961 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001962 ss.connect(self.server_addr)
1963 self.assertIs(ss.context, ctx1)
1964 self.assertIs(ss._sslobj.context, ctx1)
1965 ss.context = ctx2
1966 self.assertIs(ss.context, ctx2)
1967 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001968
1969 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1970 # A simple IO loop. Call func(*args) depending on the error we get
1971 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1972 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001973 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001974 count = 0
1975 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001976 if time.monotonic() > deadline:
1977 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001978 errno = None
1979 count += 1
1980 try:
1981 ret = func(*args)
1982 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001983 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001984 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001985 raise
1986 errno = e.errno
1987 # Get any data from the outgoing BIO irrespective of any error, and
1988 # send it to the socket.
1989 buf = outgoing.read()
1990 sock.sendall(buf)
1991 # If there's no error, we're done. For WANT_READ, we need to get
1992 # data from the socket and put it in the incoming BIO.
1993 if errno is None:
1994 break
1995 elif errno == ssl.SSL_ERROR_WANT_READ:
1996 buf = sock.recv(32768)
1997 if buf:
1998 incoming.write(buf)
1999 else:
2000 incoming.write_eof()
2001 if support.verbose:
2002 sys.stdout.write("Needed %d calls to complete %s().\n"
2003 % (count, func.__name__))
2004 return ret
2005
Martin Panter3840b2a2016-03-27 01:53:46 +00002006 def test_bio_handshake(self):
2007 sock = socket.socket(socket.AF_INET)
2008 self.addCleanup(sock.close)
2009 sock.connect(self.server_addr)
2010 incoming = ssl.MemoryBIO()
2011 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002012 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2013 self.assertTrue(ctx.check_hostname)
2014 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002015 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002016 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2017 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002018 self.assertIs(sslobj._sslobj.owner, sslobj)
2019 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002020 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002021 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002022 self.assertRaises(ValueError, sslobj.getpeercert)
2023 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2024 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2025 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2026 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002027 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002028 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002029 self.assertTrue(sslobj.getpeercert())
2030 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2031 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2032 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002033 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002034 except ssl.SSLSyscallError:
2035 # If the server shuts down the TCP connection without sending a
2036 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2037 pass
2038 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2039
2040 def test_bio_read_write_data(self):
2041 sock = socket.socket(socket.AF_INET)
2042 self.addCleanup(sock.close)
2043 sock.connect(self.server_addr)
2044 incoming = ssl.MemoryBIO()
2045 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002046 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002047 ctx.verify_mode = ssl.CERT_NONE
2048 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2049 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2050 req = b'FOO\n'
2051 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2052 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2053 self.assertEqual(buf, b'foo\n')
2054 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002055
2056
Martin Panter3840b2a2016-03-27 01:53:46 +00002057class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002058
Martin Panter3840b2a2016-03-27 01:53:46 +00002059 def test_timeout_connect_ex(self):
2060 # Issue #12065: on a timeout, connect_ex() should return the original
2061 # errno (mimicking the behaviour of non-SSL sockets).
2062 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002063 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002064 cert_reqs=ssl.CERT_REQUIRED,
2065 do_handshake_on_connect=False)
2066 self.addCleanup(s.close)
2067 s.settimeout(0.0000001)
2068 rc = s.connect_ex((REMOTE_HOST, 443))
2069 if rc == 0:
2070 self.skipTest("REMOTE_HOST responded too quickly")
2071 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2072
2073 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2074 def test_get_server_certificate_ipv6(self):
2075 with support.transient_internet('ipv6.google.com'):
2076 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2077 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2078
Martin Panter3840b2a2016-03-27 01:53:46 +00002079
2080def _test_get_server_certificate(test, host, port, cert=None):
2081 pem = ssl.get_server_certificate((host, port))
2082 if not pem:
2083 test.fail("No server certificate on %s:%s!" % (host, port))
2084
2085 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2086 if not pem:
2087 test.fail("No server certificate on %s:%s!" % (host, port))
2088 if support.verbose:
2089 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2090
2091def _test_get_server_certificate_fail(test, host, port):
2092 try:
2093 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2094 except ssl.SSLError as x:
2095 #should fail
2096 if support.verbose:
2097 sys.stdout.write("%s\n" % x)
2098 else:
2099 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2100
2101
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002102from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002103
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002104class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002106 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002107
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002108 """A mildly complicated class, because we want it to work both
2109 with and without the SSL wrapper around the socket connection, so
2110 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002111
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002112 def __init__(self, server, connsock, addr):
2113 self.server = server
2114 self.running = False
2115 self.sock = connsock
2116 self.addr = addr
2117 self.sock.setblocking(1)
2118 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002119 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002120 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002121
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002122 def wrap_conn(self):
2123 try:
2124 self.sslconn = self.server.context.wrap_socket(
2125 self.sock, server_side=True)
2126 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2127 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002128 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002129 # We treat ConnectionResetError as though it were an
2130 # SSLError - OpenSSL on Ubuntu abruptly closes the
2131 # connection when asked to use an unsupported protocol.
2132 #
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002133 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2134 # tries to send session tickets after handshake.
2135 # https://github.com/openssl/openssl/issues/6342
2136 self.server.conn_errors.append(str(e))
2137 if self.server.chatty:
2138 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2139 self.running = False
2140 self.close()
2141 return False
2142 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002143 # OSError may occur with wrong protocols, e.g. both
2144 # sides use PROTOCOL_TLS_SERVER.
2145 #
2146 # XXX Various errors can have happened here, for example
2147 # a mismatching protocol version, an invalid certificate,
2148 # or a low-level bug. This should be made more discriminating.
2149 #
2150 # bpo-31323: Store the exception as string to prevent
2151 # a reference leak: server -> conn_errors -> exception
2152 # -> traceback -> self (ConnectionHandler) -> server
2153 self.server.conn_errors.append(str(e))
2154 if self.server.chatty:
2155 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2156 self.running = False
2157 self.server.stop()
2158 self.close()
2159 return False
2160 else:
2161 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2162 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2163 cert = self.sslconn.getpeercert()
2164 if support.verbose and self.server.chatty:
2165 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2166 cert_binary = self.sslconn.getpeercert(True)
2167 if support.verbose and self.server.chatty:
2168 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2169 cipher = self.sslconn.cipher()
2170 if support.verbose and self.server.chatty:
2171 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2172 sys.stdout.write(" server: selected protocol is now "
2173 + str(self.sslconn.selected_npn_protocol()) + "\n")
2174 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002175
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002176 def read(self):
2177 if self.sslconn:
2178 return self.sslconn.read()
2179 else:
2180 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002181
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002182 def write(self, bytes):
2183 if self.sslconn:
2184 return self.sslconn.write(bytes)
2185 else:
2186 return self.sock.send(bytes)
2187
2188 def close(self):
2189 if self.sslconn:
2190 self.sslconn.close()
2191 else:
2192 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002193
Antoine Pitrou480a1242010-04-28 21:37:09 +00002194 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002195 self.running = True
2196 if not self.server.starttls_server:
2197 if not self.wrap_conn():
2198 return
2199 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002200 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002201 msg = self.read()
2202 stripped = msg.strip()
2203 if not stripped:
2204 # eof, so quit this handler
2205 self.running = False
2206 try:
2207 self.sock = self.sslconn.unwrap()
2208 except OSError:
2209 # Many tests shut the TCP connection down
2210 # without an SSL shutdown. This causes
2211 # unwrap() to raise OSError with errno=0!
2212 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002213 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002214 self.sslconn = None
2215 self.close()
2216 elif stripped == b'over':
2217 if support.verbose and self.server.connectionchatty:
2218 sys.stdout.write(" server: client closed connection\n")
2219 self.close()
2220 return
2221 elif (self.server.starttls_server and
2222 stripped == b'STARTTLS'):
2223 if support.verbose and self.server.connectionchatty:
2224 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2225 self.write(b"OK\n")
2226 if not self.wrap_conn():
2227 return
2228 elif (self.server.starttls_server and self.sslconn
2229 and stripped == b'ENDTLS'):
2230 if support.verbose and self.server.connectionchatty:
2231 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2232 self.write(b"OK\n")
2233 self.sock = self.sslconn.unwrap()
2234 self.sslconn = None
2235 if support.verbose and self.server.connectionchatty:
2236 sys.stdout.write(" server: connection is now unencrypted...\n")
2237 elif stripped == b'CB tls-unique':
2238 if support.verbose and self.server.connectionchatty:
2239 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2240 data = self.sslconn.get_channel_binding("tls-unique")
2241 self.write(repr(data).encode("us-ascii") + b"\n")
2242 else:
2243 if (support.verbose and
2244 self.server.connectionchatty):
2245 ctype = (self.sslconn and "encrypted") or "unencrypted"
2246 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2247 % (msg, ctype, msg.lower(), ctype))
2248 self.write(msg.lower())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002249 except ConnectionResetError:
2250 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2251 # when connection is not shut down gracefully.
2252 if self.server.chatty and support.verbose:
2253 sys.stdout.write(
2254 " Connection reset by peer: {}\n".format(
2255 self.addr)
2256 )
2257 self.close()
2258 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002259 except OSError:
2260 if self.server.chatty:
2261 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002262 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002263 self.running = False
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002264
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002265 # normally, we'd just stop here, but for the test
2266 # harness, we want to stop the server
2267 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002268
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002269 def __init__(self, certificate=None, ssl_version=None,
2270 certreqs=None, cacerts=None,
2271 chatty=True, connectionchatty=False, starttls_server=False,
2272 npn_protocols=None, alpn_protocols=None,
2273 ciphers=None, context=None):
2274 if context:
2275 self.context = context
2276 else:
2277 self.context = ssl.SSLContext(ssl_version
2278 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002279 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002280 self.context.verify_mode = (certreqs if certreqs is not None
2281 else ssl.CERT_NONE)
2282 if cacerts:
2283 self.context.load_verify_locations(cacerts)
2284 if certificate:
2285 self.context.load_cert_chain(certificate)
2286 if npn_protocols:
2287 self.context.set_npn_protocols(npn_protocols)
2288 if alpn_protocols:
2289 self.context.set_alpn_protocols(alpn_protocols)
2290 if ciphers:
2291 self.context.set_ciphers(ciphers)
2292 self.chatty = chatty
2293 self.connectionchatty = connectionchatty
2294 self.starttls_server = starttls_server
2295 self.sock = socket.socket()
2296 self.port = support.bind_port(self.sock)
2297 self.flag = None
2298 self.active = False
2299 self.selected_npn_protocols = []
2300 self.selected_alpn_protocols = []
2301 self.shared_ciphers = []
2302 self.conn_errors = []
2303 threading.Thread.__init__(self)
2304 self.daemon = True
2305
2306 def __enter__(self):
2307 self.start(threading.Event())
2308 self.flag.wait()
2309 return self
2310
2311 def __exit__(self, *args):
2312 self.stop()
2313 self.join()
2314
2315 def start(self, flag=None):
2316 self.flag = flag
2317 threading.Thread.start(self)
2318
2319 def run(self):
2320 self.sock.settimeout(0.05)
2321 self.sock.listen()
2322 self.active = True
2323 if self.flag:
2324 # signal an event
2325 self.flag.set()
2326 while self.active:
2327 try:
2328 newconn, connaddr = self.sock.accept()
2329 if support.verbose and self.chatty:
2330 sys.stdout.write(' server: new connection from '
2331 + repr(connaddr) + '\n')
2332 handler = self.ConnectionHandler(self, newconn, connaddr)
2333 handler.start()
2334 handler.join()
2335 except socket.timeout:
2336 pass
2337 except KeyboardInterrupt:
2338 self.stop()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002339 except BaseException as e:
2340 if support.verbose and self.chatty:
2341 sys.stdout.write(
2342 ' connection handling failed: ' + repr(e) + '\n')
2343
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002344 self.sock.close()
2345
2346 def stop(self):
2347 self.active = False
2348
2349class AsyncoreEchoServer(threading.Thread):
2350
2351 # this one's based on asyncore.dispatcher
2352
2353 class EchoServer (asyncore.dispatcher):
2354
2355 class ConnectionHandler(asyncore.dispatcher_with_send):
2356
2357 def __init__(self, conn, certfile):
2358 self.socket = test_wrap_socket(conn, server_side=True,
2359 certfile=certfile,
2360 do_handshake_on_connect=False)
2361 asyncore.dispatcher_with_send.__init__(self, self.socket)
2362 self._ssl_accepting = True
2363 self._do_ssl_handshake()
2364
2365 def readable(self):
2366 if isinstance(self.socket, ssl.SSLSocket):
2367 while self.socket.pending() > 0:
2368 self.handle_read_event()
2369 return True
2370
2371 def _do_ssl_handshake(self):
2372 try:
2373 self.socket.do_handshake()
2374 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2375 return
2376 except ssl.SSLEOFError:
2377 return self.handle_close()
2378 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002379 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002380 except OSError as err:
2381 if err.args[0] == errno.ECONNABORTED:
2382 return self.handle_close()
2383 else:
2384 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002385
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002386 def handle_read(self):
2387 if self._ssl_accepting:
2388 self._do_ssl_handshake()
2389 else:
2390 data = self.recv(1024)
2391 if support.verbose:
2392 sys.stdout.write(" server: read %s from client\n" % repr(data))
2393 if not data:
2394 self.close()
2395 else:
2396 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002397
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002398 def handle_close(self):
2399 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002400 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002401 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002402
2403 def handle_error(self):
2404 raise
2405
Trent Nelson78520002008-04-10 20:54:35 +00002406 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002407 self.certfile = certfile
2408 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2409 self.port = support.bind_port(sock, '')
2410 asyncore.dispatcher.__init__(self, sock)
2411 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002412
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002413 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002414 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002415 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2416 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002417
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002418 def handle_error(self):
2419 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002420
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002421 def __init__(self, certfile):
2422 self.flag = None
2423 self.active = False
2424 self.server = self.EchoServer(certfile)
2425 self.port = self.server.port
2426 threading.Thread.__init__(self)
2427 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002428
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002429 def __str__(self):
2430 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002431
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002432 def __enter__(self):
2433 self.start(threading.Event())
2434 self.flag.wait()
2435 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002436
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002437 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002438 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002439 sys.stdout.write(" cleanup: stopping server.\n")
2440 self.stop()
2441 if support.verbose:
2442 sys.stdout.write(" cleanup: joining server thread.\n")
2443 self.join()
2444 if support.verbose:
2445 sys.stdout.write(" cleanup: successfully joined.\n")
2446 # make sure that ConnectionHandler is removed from socket_map
2447 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002448
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002449 def start (self, flag=None):
2450 self.flag = flag
2451 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002452
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002453 def run(self):
2454 self.active = True
2455 if self.flag:
2456 self.flag.set()
2457 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002458 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002459 asyncore.loop(1)
2460 except:
2461 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002462
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002463 def stop(self):
2464 self.active = False
2465 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002466
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002467def server_params_test(client_context, server_context, indata=b"FOO\n",
2468 chatty=True, connectionchatty=False, sni_name=None,
2469 session=None):
2470 """
2471 Launch a server, connect a client to it and try various reads
2472 and writes.
2473 """
2474 stats = {}
2475 server = ThreadedEchoServer(context=server_context,
2476 chatty=chatty,
2477 connectionchatty=False)
2478 with server:
2479 with client_context.wrap_socket(socket.socket(),
2480 server_hostname=sni_name, session=session) as s:
2481 s.connect((HOST, server.port))
2482 for arg in [indata, bytearray(indata), memoryview(indata)]:
2483 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002484 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002485 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002486 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002487 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002488 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002489 if connectionchatty:
2490 if support.verbose:
2491 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002492 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002493 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002494 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2495 % (outdata[:20], len(outdata),
2496 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002497 s.write(b"over\n")
2498 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002499 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002500 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002501 stats.update({
2502 'compression': s.compression(),
2503 'cipher': s.cipher(),
2504 'peercert': s.getpeercert(),
2505 'client_alpn_protocol': s.selected_alpn_protocol(),
2506 'client_npn_protocol': s.selected_npn_protocol(),
2507 'version': s.version(),
2508 'session_reused': s.session_reused,
2509 'session': s.session,
2510 })
2511 s.close()
2512 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2513 stats['server_npn_protocols'] = server.selected_npn_protocols
2514 stats['server_shared_ciphers'] = server.shared_ciphers
2515 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002516
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002517def try_protocol_combo(server_protocol, client_protocol, expect_success,
2518 certsreqs=None, server_options=0, client_options=0):
2519 """
2520 Try to SSL-connect using *client_protocol* to *server_protocol*.
2521 If *expect_success* is true, assert that the connection succeeds,
2522 if it's false, assert that the connection fails.
2523 Also, if *expect_success* is a string, assert that it is the protocol
2524 version actually used by the connection.
2525 """
2526 if certsreqs is None:
2527 certsreqs = ssl.CERT_NONE
2528 certtype = {
2529 ssl.CERT_NONE: "CERT_NONE",
2530 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2531 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2532 }[certsreqs]
2533 if support.verbose:
2534 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2535 sys.stdout.write(formatstr %
2536 (ssl.get_protocol_name(client_protocol),
2537 ssl.get_protocol_name(server_protocol),
2538 certtype))
2539 client_context = ssl.SSLContext(client_protocol)
2540 client_context.options |= client_options
2541 server_context = ssl.SSLContext(server_protocol)
2542 server_context.options |= server_options
2543
2544 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2545 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2546 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002547 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002548 client_context.set_ciphers("ALL")
2549
2550 for ctx in (client_context, server_context):
2551 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002552 ctx.load_cert_chain(SIGNED_CERTFILE)
2553 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002554 try:
2555 stats = server_params_test(client_context, server_context,
2556 chatty=False, connectionchatty=False)
2557 # Protocol mismatch can result in either an SSLError, or a
2558 # "Connection reset by peer" error.
2559 except ssl.SSLError:
2560 if expect_success:
2561 raise
2562 except OSError as e:
2563 if expect_success or e.errno != errno.ECONNRESET:
2564 raise
2565 else:
2566 if not expect_success:
2567 raise AssertionError(
2568 "Client protocol %s succeeded with server protocol %s!"
2569 % (ssl.get_protocol_name(client_protocol),
2570 ssl.get_protocol_name(server_protocol)))
2571 elif (expect_success is not True
2572 and expect_success != stats['version']):
2573 raise AssertionError("version mismatch: expected %r, got %r"
2574 % (expect_success, stats['version']))
2575
2576
2577class ThreadedTests(unittest.TestCase):
2578
2579 @skip_if_broken_ubuntu_ssl
2580 def test_echo(self):
2581 """Basic test of an SSL client connecting to a server"""
2582 if support.verbose:
2583 sys.stdout.write("\n")
2584 for protocol in PROTOCOLS:
2585 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2586 continue
2587 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2588 context = ssl.SSLContext(protocol)
2589 context.load_cert_chain(CERTFILE)
2590 server_params_test(context, context,
2591 chatty=True, connectionchatty=True)
2592
Christian Heimesa170fa12017-09-15 20:27:30 +02002593 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594
2595 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2596 server_params_test(client_context=client_context,
2597 server_context=server_context,
2598 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002599 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002600
2601 client_context.check_hostname = False
2602 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2603 with self.assertRaises(ssl.SSLError) as e:
2604 server_params_test(client_context=server_context,
2605 server_context=client_context,
2606 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002607 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002608 self.assertIn('called a function you should not call',
2609 str(e.exception))
2610
2611 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2612 with self.assertRaises(ssl.SSLError) as e:
2613 server_params_test(client_context=server_context,
2614 server_context=server_context,
2615 chatty=True, connectionchatty=True)
2616 self.assertIn('called a function you should not call',
2617 str(e.exception))
2618
2619 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2620 with self.assertRaises(ssl.SSLError) as e:
2621 server_params_test(client_context=server_context,
2622 server_context=client_context,
2623 chatty=True, connectionchatty=True)
2624 self.assertIn('called a function you should not call',
2625 str(e.exception))
2626
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002627 def test_getpeercert(self):
2628 if support.verbose:
2629 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002630
2631 client_context, server_context, hostname = testing_context()
2632 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002633 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002634 with client_context.wrap_socket(socket.socket(),
2635 do_handshake_on_connect=False,
2636 server_hostname=hostname) as s:
2637 s.connect((HOST, server.port))
2638 # getpeercert() raise ValueError while the handshake isn't
2639 # done.
2640 with self.assertRaises(ValueError):
2641 s.getpeercert()
2642 s.do_handshake()
2643 cert = s.getpeercert()
2644 self.assertTrue(cert, "Can't get peer certificate.")
2645 cipher = s.cipher()
2646 if support.verbose:
2647 sys.stdout.write(pprint.pformat(cert) + '\n')
2648 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2649 if 'subject' not in cert:
2650 self.fail("No subject field in certificate: %s." %
2651 pprint.pformat(cert))
2652 if ((('organizationName', 'Python Software Foundation'),)
2653 not in cert['subject']):
2654 self.fail(
2655 "Missing or invalid 'organizationName' field in certificate subject; "
2656 "should be 'Python Software Foundation'.")
2657 self.assertIn('notBefore', cert)
2658 self.assertIn('notAfter', cert)
2659 before = ssl.cert_time_to_seconds(cert['notBefore'])
2660 after = ssl.cert_time_to_seconds(cert['notAfter'])
2661 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002662
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002663 @unittest.skipUnless(have_verify_flags(),
2664 "verify_flags need OpenSSL > 0.9.8")
2665 def test_crl_check(self):
2666 if support.verbose:
2667 sys.stdout.write("\n")
2668
Christian Heimesa170fa12017-09-15 20:27:30 +02002669 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002670
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002671 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002672 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002673
2674 # VERIFY_DEFAULT should pass
2675 server = ThreadedEchoServer(context=server_context, chatty=True)
2676 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002677 with client_context.wrap_socket(socket.socket(),
2678 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002679 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002680 cert = s.getpeercert()
2681 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002682
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002683 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002684 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002685
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002686 server = ThreadedEchoServer(context=server_context, chatty=True)
2687 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002688 with client_context.wrap_socket(socket.socket(),
2689 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002690 with self.assertRaisesRegex(ssl.SSLError,
2691 "certificate verify failed"):
2692 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002693
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002694 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002695 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002696
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002697 server = ThreadedEchoServer(context=server_context, chatty=True)
2698 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002699 with client_context.wrap_socket(socket.socket(),
2700 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002701 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002702 cert = s.getpeercert()
2703 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002704
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705 def test_check_hostname(self):
2706 if support.verbose:
2707 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002708
Christian Heimesa170fa12017-09-15 20:27:30 +02002709 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002710
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002711 # correct hostname should verify
2712 server = ThreadedEchoServer(context=server_context, chatty=True)
2713 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002714 with client_context.wrap_socket(socket.socket(),
2715 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002716 s.connect((HOST, server.port))
2717 cert = s.getpeercert()
2718 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002719
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002720 # incorrect hostname should raise an exception
2721 server = ThreadedEchoServer(context=server_context, chatty=True)
2722 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002723 with client_context.wrap_socket(socket.socket(),
2724 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002725 with self.assertRaisesRegex(
2726 ssl.CertificateError,
2727 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002728 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002729
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002730 # missing server_hostname arg should cause an exception, too
2731 server = ThreadedEchoServer(context=server_context, chatty=True)
2732 with server:
2733 with socket.socket() as s:
2734 with self.assertRaisesRegex(ValueError,
2735 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002736 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002737
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002738 def test_ecc_cert(self):
2739 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2740 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002741 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002742 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2743
2744 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2745 # load ECC cert
2746 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2747
2748 # correct hostname should verify
2749 server = ThreadedEchoServer(context=server_context, chatty=True)
2750 with server:
2751 with client_context.wrap_socket(socket.socket(),
2752 server_hostname=hostname) as s:
2753 s.connect((HOST, server.port))
2754 cert = s.getpeercert()
2755 self.assertTrue(cert, "Can't get peer certificate.")
2756 cipher = s.cipher()[0].split('-')
2757 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2758
2759 def test_dual_rsa_ecc(self):
2760 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2761 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002762 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2763 # algorithms.
2764 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002765 # only ECDSA certs
2766 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2767 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2768
2769 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2770 # load ECC and RSA key/cert pairs
2771 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2772 server_context.load_cert_chain(SIGNED_CERTFILE)
2773
2774 # correct hostname should verify
2775 server = ThreadedEchoServer(context=server_context, chatty=True)
2776 with server:
2777 with client_context.wrap_socket(socket.socket(),
2778 server_hostname=hostname) as s:
2779 s.connect((HOST, server.port))
2780 cert = s.getpeercert()
2781 self.assertTrue(cert, "Can't get peer certificate.")
2782 cipher = s.cipher()[0].split('-')
2783 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2784
Christian Heimes66e57422018-01-29 14:25:13 +01002785 def test_check_hostname_idn(self):
2786 if support.verbose:
2787 sys.stdout.write("\n")
2788
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002789 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002790 server_context.load_cert_chain(IDNSANSFILE)
2791
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002792 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002793 context.verify_mode = ssl.CERT_REQUIRED
2794 context.check_hostname = True
2795 context.load_verify_locations(SIGNING_CA)
2796
2797 # correct hostname should verify, when specified in several
2798 # different ways
2799 idn_hostnames = [
2800 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002801 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002802 ('xn--knig-5qa.idn.pythontest.net',
2803 'xn--knig-5qa.idn.pythontest.net'),
2804 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002805 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002806
2807 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002808 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002809 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2810 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2811 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002812 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2813
2814 # ('königsgäßchen.idna2008.pythontest.net',
2815 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2816 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2817 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2818 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2819 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2820
Christian Heimes66e57422018-01-29 14:25:13 +01002821 ]
2822 for server_hostname, expected_hostname in idn_hostnames:
2823 server = ThreadedEchoServer(context=server_context, chatty=True)
2824 with server:
2825 with context.wrap_socket(socket.socket(),
2826 server_hostname=server_hostname) as s:
2827 self.assertEqual(s.server_hostname, expected_hostname)
2828 s.connect((HOST, server.port))
2829 cert = s.getpeercert()
2830 self.assertEqual(s.server_hostname, expected_hostname)
2831 self.assertTrue(cert, "Can't get peer certificate.")
2832
Christian Heimes66e57422018-01-29 14:25:13 +01002833 # incorrect hostname should raise an exception
2834 server = ThreadedEchoServer(context=server_context, chatty=True)
2835 with server:
2836 with context.wrap_socket(socket.socket(),
2837 server_hostname="python.example.org") as s:
2838 with self.assertRaises(ssl.CertificateError):
2839 s.connect((HOST, server.port))
2840
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002841 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002842 """Connecting when the server rejects the client's certificate
2843
2844 Launch a server with CERT_REQUIRED, and check that trying to
2845 connect to it with a wrong client certificate fails.
2846 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002847 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002848 # load client cert that is not signed by trusted CA
2849 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002850 # require TLS client authentication
2851 server_context.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002852 # TLS 1.3 has different handshake
2853 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002854
2855 server = ThreadedEchoServer(
2856 context=server_context, chatty=True, connectionchatty=True,
2857 )
2858
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002859 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002860 client_context.wrap_socket(socket.socket(),
2861 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002862 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002863 # Expect either an SSL error about the server rejecting
2864 # the connection, or a low-level connection reset (which
2865 # sometimes happens on Windows)
2866 s.connect((HOST, server.port))
2867 except ssl.SSLError as e:
2868 if support.verbose:
2869 sys.stdout.write("\nSSLError is %r\n" % e)
2870 except OSError as e:
2871 if e.errno != errno.ECONNRESET:
2872 raise
2873 if support.verbose:
2874 sys.stdout.write("\nsocket.error is %r\n" % e)
2875 else:
2876 self.fail("Use of invalid cert should have failed!")
2877
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002878 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2879 def test_wrong_cert_tls13(self):
2880 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002881 # load client cert that is not signed by trusted CA
2882 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002883 server_context.verify_mode = ssl.CERT_REQUIRED
2884 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2885 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2886
2887 server = ThreadedEchoServer(
2888 context=server_context, chatty=True, connectionchatty=True,
2889 )
2890 with server, \
2891 client_context.wrap_socket(socket.socket(),
2892 server_hostname=hostname) as s:
2893 # TLS 1.3 perform client cert exchange after handshake
2894 s.connect((HOST, server.port))
2895 try:
2896 s.write(b'data')
2897 s.read(4)
2898 except ssl.SSLError as e:
2899 if support.verbose:
2900 sys.stdout.write("\nSSLError is %r\n" % e)
2901 except OSError as e:
2902 if e.errno != errno.ECONNRESET:
2903 raise
2904 if support.verbose:
2905 sys.stdout.write("\nsocket.error is %r\n" % e)
2906 else:
2907 self.fail("Use of invalid cert should have failed!")
2908
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002909 def test_rude_shutdown(self):
2910 """A brutal shutdown of an SSL server should raise an OSError
2911 in the client when attempting handshake.
2912 """
2913 listener_ready = threading.Event()
2914 listener_gone = threading.Event()
2915
2916 s = socket.socket()
2917 port = support.bind_port(s, HOST)
2918
2919 # `listener` runs in a thread. It sits in an accept() until
2920 # the main thread connects. Then it rudely closes the socket,
2921 # and sets Event `listener_gone` to let the main thread know
2922 # the socket is gone.
2923 def listener():
2924 s.listen()
2925 listener_ready.set()
2926 newsock, addr = s.accept()
2927 newsock.close()
2928 s.close()
2929 listener_gone.set()
2930
2931 def connector():
2932 listener_ready.wait()
2933 with socket.socket() as c:
2934 c.connect((HOST, port))
2935 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002936 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 ssl_sock = test_wrap_socket(c)
2938 except OSError:
2939 pass
2940 else:
2941 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002942
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002943 t = threading.Thread(target=listener)
2944 t.start()
2945 try:
2946 connector()
2947 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002948 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002949
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002950 def test_ssl_cert_verify_error(self):
2951 if support.verbose:
2952 sys.stdout.write("\n")
2953
2954 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2955 server_context.load_cert_chain(SIGNED_CERTFILE)
2956
2957 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2958
2959 server = ThreadedEchoServer(context=server_context, chatty=True)
2960 with server:
2961 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002962 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002963 try:
2964 s.connect((HOST, server.port))
2965 except ssl.SSLError as e:
2966 msg = 'unable to get local issuer certificate'
2967 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2968 self.assertEqual(e.verify_code, 20)
2969 self.assertEqual(e.verify_message, msg)
2970 self.assertIn(msg, repr(e))
2971 self.assertIn('certificate verify failed', repr(e))
2972
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 @skip_if_broken_ubuntu_ssl
2974 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2975 "OpenSSL is compiled without SSLv2 support")
2976 def test_protocol_sslv2(self):
2977 """Connecting to an SSLv2 server with various client options"""
2978 if support.verbose:
2979 sys.stdout.write("\n")
2980 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2981 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2982 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002983 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002984 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2985 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2986 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2987 # SSLv23 client with specific SSL options
2988 if no_sslv2_implies_sslv3_hello():
2989 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002990 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002991 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002992 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002993 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002994 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002995 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002996
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002997 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002998 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002999 """Connecting to an SSLv23 server with various client options"""
3000 if support.verbose:
3001 sys.stdout.write("\n")
3002 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003003 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003004 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003005 except OSError as x:
3006 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3007 if support.verbose:
3008 sys.stdout.write(
3009 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3010 % str(x))
3011 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003012 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3013 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3014 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003015
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003016 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003017 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3018 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3019 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003020
3021 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003022 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3023 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3024 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003025
3026 # Server with specific SSL options
3027 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003028 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003029 server_options=ssl.OP_NO_SSLv3)
3030 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003031 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003032 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003033 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003034 server_options=ssl.OP_NO_TLSv1)
3035
3036
3037 @skip_if_broken_ubuntu_ssl
3038 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3039 "OpenSSL is compiled without SSLv3 support")
3040 def test_protocol_sslv3(self):
3041 """Connecting to an SSLv3 server with various client options"""
3042 if support.verbose:
3043 sys.stdout.write("\n")
3044 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3045 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3046 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3047 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3048 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003049 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003050 client_options=ssl.OP_NO_SSLv3)
3051 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3052 if no_sslv2_implies_sslv3_hello():
3053 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003054 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003055 False, client_options=ssl.OP_NO_SSLv2)
3056
3057 @skip_if_broken_ubuntu_ssl
3058 def test_protocol_tlsv1(self):
3059 """Connecting to a TLSv1 server with various client options"""
3060 if support.verbose:
3061 sys.stdout.write("\n")
3062 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3063 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3064 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3065 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3066 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3067 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3068 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003069 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070 client_options=ssl.OP_NO_TLSv1)
3071
3072 @skip_if_broken_ubuntu_ssl
3073 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3074 "TLS version 1.1 not supported.")
3075 def test_protocol_tlsv1_1(self):
3076 """Connecting to a TLSv1.1 server with various client options.
3077 Testing against older TLS versions."""
3078 if support.verbose:
3079 sys.stdout.write("\n")
3080 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3081 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3082 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3083 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3084 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003085 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 client_options=ssl.OP_NO_TLSv1_1)
3087
Christian Heimesa170fa12017-09-15 20:27:30 +02003088 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3090 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3091
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003092 @skip_if_broken_ubuntu_ssl
3093 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3094 "TLS version 1.2 not supported.")
3095 def test_protocol_tlsv1_2(self):
3096 """Connecting to a TLSv1.2 server with various client options.
3097 Testing against older TLS versions."""
3098 if support.verbose:
3099 sys.stdout.write("\n")
3100 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3101 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3102 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3103 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3104 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3105 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3106 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003107 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003108 client_options=ssl.OP_NO_TLSv1_2)
3109
Christian Heimesa170fa12017-09-15 20:27:30 +02003110 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003111 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3112 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3113 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3114 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3115
3116 def test_starttls(self):
3117 """Switching from clear text to encrypted and back again."""
3118 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3119
3120 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003121 starttls_server=True,
3122 chatty=True,
3123 connectionchatty=True)
3124 wrapped = False
3125 with server:
3126 s = socket.socket()
3127 s.setblocking(1)
3128 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003129 if support.verbose:
3130 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003131 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003132 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003133 sys.stdout.write(
3134 " client: sending %r...\n" % indata)
3135 if wrapped:
3136 conn.write(indata)
3137 outdata = conn.read()
3138 else:
3139 s.send(indata)
3140 outdata = s.recv(1024)
3141 msg = outdata.strip().lower()
3142 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3143 # STARTTLS ok, switch to secure mode
3144 if support.verbose:
3145 sys.stdout.write(
3146 " client: read %r from server, starting TLS...\n"
3147 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003148 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003149 wrapped = True
3150 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3151 # ENDTLS ok, switch back to clear text
3152 if support.verbose:
3153 sys.stdout.write(
3154 " client: read %r from server, ending TLS...\n"
3155 % msg)
3156 s = conn.unwrap()
3157 wrapped = False
3158 else:
3159 if support.verbose:
3160 sys.stdout.write(
3161 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003162 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003163 sys.stdout.write(" client: closing connection.\n")
3164 if wrapped:
3165 conn.write(b"over\n")
3166 else:
3167 s.send(b"over\n")
3168 if wrapped:
3169 conn.close()
3170 else:
3171 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003173 def test_socketserver(self):
3174 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003175 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003176 # try to connect
3177 if support.verbose:
3178 sys.stdout.write('\n')
3179 with open(CERTFILE, 'rb') as f:
3180 d1 = f.read()
3181 d2 = ''
3182 # now fetch the same data from the HTTPS server
3183 url = 'https://localhost:%d/%s' % (
3184 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003185 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003186 f = urllib.request.urlopen(url, context=context)
3187 try:
3188 dlen = f.info().get("content-length")
3189 if dlen and (int(dlen) > 0):
3190 d2 = f.read(int(dlen))
3191 if support.verbose:
3192 sys.stdout.write(
3193 " client: read %d bytes from remote server '%s'\n"
3194 % (len(d2), server))
3195 finally:
3196 f.close()
3197 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003198
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199 def test_asyncore_server(self):
3200 """Check the example asyncore integration."""
3201 if support.verbose:
3202 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003203
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003204 indata = b"FOO\n"
3205 server = AsyncoreEchoServer(CERTFILE)
3206 with server:
3207 s = test_wrap_socket(socket.socket())
3208 s.connect(('127.0.0.1', server.port))
3209 if support.verbose:
3210 sys.stdout.write(
3211 " client: sending %r...\n" % indata)
3212 s.write(indata)
3213 outdata = s.read()
3214 if support.verbose:
3215 sys.stdout.write(" client: read %r\n" % outdata)
3216 if outdata != indata.lower():
3217 self.fail(
3218 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3219 % (outdata[:20], len(outdata),
3220 indata[:20].lower(), len(indata)))
3221 s.write(b"over\n")
3222 if support.verbose:
3223 sys.stdout.write(" client: closing connection.\n")
3224 s.close()
3225 if support.verbose:
3226 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003227
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003228 def test_recv_send(self):
3229 """Test recv(), send() and friends."""
3230 if support.verbose:
3231 sys.stdout.write("\n")
3232
3233 server = ThreadedEchoServer(CERTFILE,
3234 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003235 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003236 cacerts=CERTFILE,
3237 chatty=True,
3238 connectionchatty=False)
3239 with server:
3240 s = test_wrap_socket(socket.socket(),
3241 server_side=False,
3242 certfile=CERTFILE,
3243 ca_certs=CERTFILE,
3244 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003245 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003246 s.connect((HOST, server.port))
3247 # helper methods for standardising recv* method signatures
3248 def _recv_into():
3249 b = bytearray(b"\0"*100)
3250 count = s.recv_into(b)
3251 return b[:count]
3252
3253 def _recvfrom_into():
3254 b = bytearray(b"\0"*100)
3255 count, addr = s.recvfrom_into(b)
3256 return b[:count]
3257
3258 # (name, method, expect success?, *args, return value func)
3259 send_methods = [
3260 ('send', s.send, True, [], len),
3261 ('sendto', s.sendto, False, ["some.address"], len),
3262 ('sendall', s.sendall, True, [], lambda x: None),
3263 ]
3264 # (name, method, whether to expect success, *args)
3265 recv_methods = [
3266 ('recv', s.recv, True, []),
3267 ('recvfrom', s.recvfrom, False, ["some.address"]),
3268 ('recv_into', _recv_into, True, []),
3269 ('recvfrom_into', _recvfrom_into, False, []),
3270 ]
3271 data_prefix = "PREFIX_"
3272
3273 for (meth_name, send_meth, expect_success, args,
3274 ret_val_meth) in send_methods:
3275 indata = (data_prefix + meth_name).encode('ascii')
3276 try:
3277 ret = send_meth(indata, *args)
3278 msg = "sending with {}".format(meth_name)
3279 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3280 outdata = s.read()
3281 if outdata != indata.lower():
3282 self.fail(
3283 "While sending with <<{name:s}>> bad data "
3284 "<<{outdata:r}>> ({nout:d}) received; "
3285 "expected <<{indata:r}>> ({nin:d})\n".format(
3286 name=meth_name, outdata=outdata[:20],
3287 nout=len(outdata),
3288 indata=indata[:20], nin=len(indata)
3289 )
3290 )
3291 except ValueError as e:
3292 if expect_success:
3293 self.fail(
3294 "Failed to send with method <<{name:s}>>; "
3295 "expected to succeed.\n".format(name=meth_name)
3296 )
3297 if not str(e).startswith(meth_name):
3298 self.fail(
3299 "Method <<{name:s}>> failed with unexpected "
3300 "exception message: {exp:s}\n".format(
3301 name=meth_name, exp=e
3302 )
3303 )
3304
3305 for meth_name, recv_meth, expect_success, args in recv_methods:
3306 indata = (data_prefix + meth_name).encode('ascii')
3307 try:
3308 s.send(indata)
3309 outdata = recv_meth(*args)
3310 if outdata != indata.lower():
3311 self.fail(
3312 "While receiving with <<{name:s}>> bad data "
3313 "<<{outdata:r}>> ({nout:d}) received; "
3314 "expected <<{indata:r}>> ({nin:d})\n".format(
3315 name=meth_name, outdata=outdata[:20],
3316 nout=len(outdata),
3317 indata=indata[:20], nin=len(indata)
3318 )
3319 )
3320 except ValueError as e:
3321 if expect_success:
3322 self.fail(
3323 "Failed to receive with method <<{name:s}>>; "
3324 "expected to succeed.\n".format(name=meth_name)
3325 )
3326 if not str(e).startswith(meth_name):
3327 self.fail(
3328 "Method <<{name:s}>> failed with unexpected "
3329 "exception message: {exp:s}\n".format(
3330 name=meth_name, exp=e
3331 )
3332 )
3333 # consume data
3334 s.read()
3335
3336 # read(-1, buffer) is supported, even though read(-1) is not
3337 data = b"data"
3338 s.send(data)
3339 buffer = bytearray(len(data))
3340 self.assertEqual(s.read(-1, buffer), len(data))
3341 self.assertEqual(buffer, data)
3342
Christian Heimes888bbdc2017-09-07 14:18:21 -07003343 # sendall accepts bytes-like objects
3344 if ctypes is not None:
3345 ubyte = ctypes.c_ubyte * len(data)
3346 byteslike = ubyte.from_buffer_copy(data)
3347 s.sendall(byteslike)
3348 self.assertEqual(s.read(), data)
3349
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003350 # Make sure sendmsg et al are disallowed to avoid
3351 # inadvertent disclosure of data and/or corruption
3352 # of the encrypted data stream
3353 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3354 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3355 self.assertRaises(NotImplementedError,
3356 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003357 s.write(b"over\n")
3358
3359 self.assertRaises(ValueError, s.recv, -1)
3360 self.assertRaises(ValueError, s.read, -1)
3361
3362 s.close()
3363
3364 def test_recv_zero(self):
3365 server = ThreadedEchoServer(CERTFILE)
3366 server.__enter__()
3367 self.addCleanup(server.__exit__, None, None)
3368 s = socket.create_connection((HOST, server.port))
3369 self.addCleanup(s.close)
3370 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3371 self.addCleanup(s.close)
3372
3373 # recv/read(0) should return no data
3374 s.send(b"data")
3375 self.assertEqual(s.recv(0), b"")
3376 self.assertEqual(s.read(0), b"")
3377 self.assertEqual(s.read(), b"data")
3378
3379 # Should not block if the other end sends no data
3380 s.setblocking(False)
3381 self.assertEqual(s.recv(0), b"")
3382 self.assertEqual(s.recv_into(bytearray()), 0)
3383
3384 def test_nonblocking_send(self):
3385 server = ThreadedEchoServer(CERTFILE,
3386 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003387 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003388 cacerts=CERTFILE,
3389 chatty=True,
3390 connectionchatty=False)
3391 with server:
3392 s = test_wrap_socket(socket.socket(),
3393 server_side=False,
3394 certfile=CERTFILE,
3395 ca_certs=CERTFILE,
3396 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003397 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003398 s.connect((HOST, server.port))
3399 s.setblocking(False)
3400
3401 # If we keep sending data, at some point the buffers
3402 # will be full and the call will block
3403 buf = bytearray(8192)
3404 def fill_buffer():
3405 while True:
3406 s.send(buf)
3407 self.assertRaises((ssl.SSLWantWriteError,
3408 ssl.SSLWantReadError), fill_buffer)
3409
3410 # Now read all the output and discard it
3411 s.setblocking(True)
3412 s.close()
3413
3414 def test_handshake_timeout(self):
3415 # Issue #5103: SSL handshake must respect the socket timeout
3416 server = socket.socket(socket.AF_INET)
3417 host = "127.0.0.1"
3418 port = support.bind_port(server)
3419 started = threading.Event()
3420 finish = False
3421
3422 def serve():
3423 server.listen()
3424 started.set()
3425 conns = []
3426 while not finish:
3427 r, w, e = select.select([server], [], [], 0.1)
3428 if server in r:
3429 # Let the socket hang around rather than having
3430 # it closed by garbage collection.
3431 conns.append(server.accept()[0])
3432 for sock in conns:
3433 sock.close()
3434
3435 t = threading.Thread(target=serve)
3436 t.start()
3437 started.wait()
3438
3439 try:
3440 try:
3441 c = socket.socket(socket.AF_INET)
3442 c.settimeout(0.2)
3443 c.connect((host, port))
3444 # Will attempt handshake and time out
3445 self.assertRaisesRegex(socket.timeout, "timed out",
3446 test_wrap_socket, c)
3447 finally:
3448 c.close()
3449 try:
3450 c = socket.socket(socket.AF_INET)
3451 c = test_wrap_socket(c)
3452 c.settimeout(0.2)
3453 # Will attempt handshake and time out
3454 self.assertRaisesRegex(socket.timeout, "timed out",
3455 c.connect, (host, port))
3456 finally:
3457 c.close()
3458 finally:
3459 finish = True
3460 t.join()
3461 server.close()
3462
3463 def test_server_accept(self):
3464 # Issue #16357: accept() on a SSLSocket created through
3465 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003466 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003467 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003468 context.load_verify_locations(SIGNING_CA)
3469 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003470 server = socket.socket(socket.AF_INET)
3471 host = "127.0.0.1"
3472 port = support.bind_port(server)
3473 server = context.wrap_socket(server, server_side=True)
3474 self.assertTrue(server.server_side)
3475
3476 evt = threading.Event()
3477 remote = None
3478 peer = None
3479 def serve():
3480 nonlocal remote, peer
3481 server.listen()
3482 # Block on the accept and wait on the connection to close.
3483 evt.set()
3484 remote, peer = server.accept()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003485 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003486
3487 t = threading.Thread(target=serve)
3488 t.start()
3489 # Client wait until server setup and perform a connect.
3490 evt.wait()
3491 client = context.wrap_socket(socket.socket())
3492 client.connect((host, port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003493 client.send(b'data')
3494 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003495 client_addr = client.getsockname()
3496 client.close()
3497 t.join()
3498 remote.close()
3499 server.close()
3500 # Sanity checks.
3501 self.assertIsInstance(remote, ssl.SSLSocket)
3502 self.assertEqual(peer, client_addr)
3503
3504 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003505 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003506 with context.wrap_socket(socket.socket()) as sock:
3507 with self.assertRaises(OSError) as cm:
3508 sock.getpeercert()
3509 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3510
3511 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003512 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003513 with context.wrap_socket(socket.socket()) as sock:
3514 with self.assertRaises(OSError) as cm:
3515 sock.do_handshake()
3516 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3517
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003518 def test_no_shared_ciphers(self):
3519 client_context, server_context, hostname = testing_context()
3520 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3521 client_context.options |= ssl.OP_NO_TLSv1_3
3522 # Force different suites on client and master
3523 client_context.set_ciphers("AES128")
3524 server_context.set_ciphers("AES256")
3525 with ThreadedEchoServer(context=server_context) as server:
3526 with client_context.wrap_socket(socket.socket(),
3527 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003528 with self.assertRaises(OSError):
3529 s.connect((HOST, server.port))
3530 self.assertIn("no shared cipher", server.conn_errors[0])
3531
3532 def test_version_basic(self):
3533 """
3534 Basic tests for SSLSocket.version().
3535 More tests are done in the test_protocol_*() methods.
3536 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003537 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3538 context.check_hostname = False
3539 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003540 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003541 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003542 chatty=False) as server:
3543 with context.wrap_socket(socket.socket()) as s:
3544 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003545 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003546 s.connect((HOST, server.port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003547 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003548 self.assertEqual(s.version(), 'TLSv1.3')
3549 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003550 self.assertEqual(s.version(), 'TLSv1.2')
3551 else: # 0.9.8 to 1.0.1
3552 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003553 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003554 self.assertIs(s.version(), None)
3555
Christian Heimescb5b68a2017-09-07 18:07:00 -07003556 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3557 "test requires TLSv1.3 enabled OpenSSL")
3558 def test_tls1_3(self):
3559 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3560 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003561 context.options |= (
3562 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3563 )
3564 with ThreadedEchoServer(context=context) as server:
3565 with context.wrap_socket(socket.socket()) as s:
3566 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003567 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003568 'TLS_AES_256_GCM_SHA384',
3569 'TLS_CHACHA20_POLY1305_SHA256',
3570 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003571 })
3572 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003573
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003574 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3575 "required OpenSSL 1.1.0g")
3576 def test_min_max_version(self):
3577 client_context, server_context, hostname = testing_context()
3578 # client TLSv1.0 to 1.2
3579 client_context.minimum_version = ssl.TLSVersion.TLSv1
3580 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3581 # server only TLSv1.2
3582 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3583 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3584
3585 with ThreadedEchoServer(context=server_context) as server:
3586 with client_context.wrap_socket(socket.socket(),
3587 server_hostname=hostname) as s:
3588 s.connect((HOST, server.port))
3589 self.assertEqual(s.version(), 'TLSv1.2')
3590
3591 # client 1.0 to 1.2, server 1.0 to 1.1
3592 server_context.minimum_version = ssl.TLSVersion.TLSv1
3593 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3594
3595 with ThreadedEchoServer(context=server_context) as server:
3596 with client_context.wrap_socket(socket.socket(),
3597 server_hostname=hostname) as s:
3598 s.connect((HOST, server.port))
3599 self.assertEqual(s.version(), 'TLSv1.1')
3600
3601 # client 1.0, server 1.2 (mismatch)
3602 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3603 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3604 client_context.minimum_version = ssl.TLSVersion.TLSv1
3605 client_context.maximum_version = ssl.TLSVersion.TLSv1
3606 with ThreadedEchoServer(context=server_context) as server:
3607 with client_context.wrap_socket(socket.socket(),
3608 server_hostname=hostname) as s:
3609 with self.assertRaises(ssl.SSLError) as e:
3610 s.connect((HOST, server.port))
3611 self.assertIn("alert", str(e.exception))
3612
3613
3614 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3615 "required OpenSSL 1.1.0g")
3616 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3617 def test_min_max_version_sslv3(self):
3618 client_context, server_context, hostname = testing_context()
3619 server_context.minimum_version = ssl.TLSVersion.SSLv3
3620 client_context.minimum_version = ssl.TLSVersion.SSLv3
3621 client_context.maximum_version = ssl.TLSVersion.SSLv3
3622 with ThreadedEchoServer(context=server_context) as server:
3623 with client_context.wrap_socket(socket.socket(),
3624 server_hostname=hostname) as s:
3625 s.connect((HOST, server.port))
3626 self.assertEqual(s.version(), 'SSLv3')
3627
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003628 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3629 def test_default_ecdh_curve(self):
3630 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3631 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003632 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003633 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003634 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3635 # cipher name.
3636 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003637 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3638 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3639 # our default cipher list should prefer ECDH-based ciphers
3640 # automatically.
3641 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3642 context.set_ciphers("ECCdraft:ECDH")
3643 with ThreadedEchoServer(context=context) as server:
3644 with context.wrap_socket(socket.socket()) as s:
3645 s.connect((HOST, server.port))
3646 self.assertIn("ECDH", s.cipher()[0])
3647
3648 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3649 "'tls-unique' channel binding not available")
3650 def test_tls_unique_channel_binding(self):
3651 """Test tls-unique channel binding."""
3652 if support.verbose:
3653 sys.stdout.write("\n")
3654
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003655 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003656
3657 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003658 chatty=True,
3659 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003660
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003661 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003662 with client_context.wrap_socket(
3663 socket.socket(),
3664 server_hostname=hostname) as s:
3665 s.connect((HOST, server.port))
3666 # get the data
3667 cb_data = s.get_channel_binding("tls-unique")
3668 if support.verbose:
3669 sys.stdout.write(
3670 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003671
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003672 # check if it is sane
3673 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003674 if s.version() == 'TLSv1.3':
3675 self.assertEqual(len(cb_data), 48)
3676 else:
3677 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003678
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003679 # and compare with the peers version
3680 s.write(b"CB tls-unique\n")
3681 peer_data_repr = s.read().strip()
3682 self.assertEqual(peer_data_repr,
3683 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003684
3685 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003686 with client_context.wrap_socket(
3687 socket.socket(),
3688 server_hostname=hostname) as s:
3689 s.connect((HOST, server.port))
3690 new_cb_data = s.get_channel_binding("tls-unique")
3691 if support.verbose:
3692 sys.stdout.write(
3693 "got another channel binding data: {0!r}\n".format(
3694 new_cb_data)
3695 )
3696 # is it really unique
3697 self.assertNotEqual(cb_data, new_cb_data)
3698 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003699 if s.version() == 'TLSv1.3':
3700 self.assertEqual(len(cb_data), 48)
3701 else:
3702 self.assertEqual(len(cb_data), 12) # True for TLSv1
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003703 s.write(b"CB tls-unique\n")
3704 peer_data_repr = s.read().strip()
3705 self.assertEqual(peer_data_repr,
3706 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003707
3708 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003709 client_context, server_context, hostname = testing_context()
3710 stats = server_params_test(client_context, server_context,
3711 chatty=True, connectionchatty=True,
3712 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003713 if support.verbose:
3714 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3715 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3716
3717 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3718 "ssl.OP_NO_COMPRESSION needed for this test")
3719 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003720 client_context, server_context, hostname = testing_context()
3721 client_context.options |= ssl.OP_NO_COMPRESSION
3722 server_context.options |= ssl.OP_NO_COMPRESSION
3723 stats = server_params_test(client_context, server_context,
3724 chatty=True, connectionchatty=True,
3725 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003726 self.assertIs(stats['compression'], None)
3727
3728 def test_dh_params(self):
3729 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003730 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003731 # test scenario needs TLS <= 1.2
3732 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003733 server_context.load_dh_params(DHFILE)
3734 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003735 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003736 stats = server_params_test(client_context, server_context,
3737 chatty=True, connectionchatty=True,
3738 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003739 cipher = stats["cipher"][0]
3740 parts = cipher.split("-")
3741 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3742 self.fail("Non-DH cipher: " + cipher[0])
3743
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003744 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003745 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003746 def test_ecdh_curve(self):
3747 # server secp384r1, client auto
3748 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003749
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003750 server_context.set_ecdh_curve("secp384r1")
3751 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3752 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3753 stats = server_params_test(client_context, server_context,
3754 chatty=True, connectionchatty=True,
3755 sni_name=hostname)
3756
3757 # server auto, client secp384r1
3758 client_context, server_context, hostname = testing_context()
3759 client_context.set_ecdh_curve("secp384r1")
3760 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3761 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3762 stats = server_params_test(client_context, server_context,
3763 chatty=True, connectionchatty=True,
3764 sni_name=hostname)
3765
3766 # server / client curve mismatch
3767 client_context, server_context, hostname = testing_context()
3768 client_context.set_ecdh_curve("prime256v1")
3769 server_context.set_ecdh_curve("secp384r1")
3770 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3771 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3772 try:
3773 stats = server_params_test(client_context, server_context,
3774 chatty=True, connectionchatty=True,
3775 sni_name=hostname)
3776 except ssl.SSLError:
3777 pass
3778 else:
3779 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003780 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003781 self.fail("mismatch curve did not fail")
3782
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003783 def test_selected_alpn_protocol(self):
3784 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003785 client_context, server_context, hostname = testing_context()
3786 stats = server_params_test(client_context, server_context,
3787 chatty=True, connectionchatty=True,
3788 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003789 self.assertIs(stats['client_alpn_protocol'], None)
3790
3791 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3792 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3793 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003794 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003795 server_context.set_alpn_protocols(['foo', 'bar'])
3796 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003797 chatty=True, connectionchatty=True,
3798 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003799 self.assertIs(stats['client_alpn_protocol'], None)
3800
3801 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3802 def test_alpn_protocols(self):
3803 server_protocols = ['foo', 'bar', 'milkshake']
3804 protocol_tests = [
3805 (['foo', 'bar'], 'foo'),
3806 (['bar', 'foo'], 'foo'),
3807 (['milkshake'], 'milkshake'),
3808 (['http/3.0', 'http/4.0'], None)
3809 ]
3810 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003811 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003812 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003813 client_context.set_alpn_protocols(client_protocols)
3814
3815 try:
3816 stats = server_params_test(client_context,
3817 server_context,
3818 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003819 connectionchatty=True,
3820 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003821 except ssl.SSLError as e:
3822 stats = e
3823
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003824 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003825 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3826 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3827 self.assertIsInstance(stats, ssl.SSLError)
3828 else:
3829 msg = "failed trying %s (s) and %s (c).\n" \
3830 "was expecting %s, but got %%s from the %%s" \
3831 % (str(server_protocols), str(client_protocols),
3832 str(expected))
3833 client_result = stats['client_alpn_protocol']
3834 self.assertEqual(client_result, expected,
3835 msg % (client_result, "client"))
3836 server_result = stats['server_alpn_protocols'][-1] \
3837 if len(stats['server_alpn_protocols']) else 'nothing'
3838 self.assertEqual(server_result, expected,
3839 msg % (server_result, "server"))
3840
3841 def test_selected_npn_protocol(self):
3842 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003843 client_context, server_context, hostname = testing_context()
3844 stats = server_params_test(client_context, server_context,
3845 chatty=True, connectionchatty=True,
3846 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003847 self.assertIs(stats['client_npn_protocol'], None)
3848
3849 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3850 def test_npn_protocols(self):
3851 server_protocols = ['http/1.1', 'spdy/2']
3852 protocol_tests = [
3853 (['http/1.1', 'spdy/2'], 'http/1.1'),
3854 (['spdy/2', 'http/1.1'], 'http/1.1'),
3855 (['spdy/2', 'test'], 'spdy/2'),
3856 (['abc', 'def'], 'abc')
3857 ]
3858 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003859 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003860 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003861 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003862 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003863 chatty=True, connectionchatty=True,
3864 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003865 msg = "failed trying %s (s) and %s (c).\n" \
3866 "was expecting %s, but got %%s from the %%s" \
3867 % (str(server_protocols), str(client_protocols),
3868 str(expected))
3869 client_result = stats['client_npn_protocol']
3870 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3871 server_result = stats['server_npn_protocols'][-1] \
3872 if len(stats['server_npn_protocols']) else 'nothing'
3873 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3874
3875 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003876 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003877 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003878 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003879 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003880 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003881 client_context.load_verify_locations(SIGNING_CA)
3882 return server_context, other_context, client_context
3883
3884 def check_common_name(self, stats, name):
3885 cert = stats['peercert']
3886 self.assertIn((('commonName', name),), cert['subject'])
3887
3888 @needs_sni
3889 def test_sni_callback(self):
3890 calls = []
3891 server_context, other_context, client_context = self.sni_contexts()
3892
Christian Heimesa170fa12017-09-15 20:27:30 +02003893 client_context.check_hostname = False
3894
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003895 def servername_cb(ssl_sock, server_name, initial_context):
3896 calls.append((server_name, initial_context))
3897 if server_name is not None:
3898 ssl_sock.context = other_context
3899 server_context.set_servername_callback(servername_cb)
3900
3901 stats = server_params_test(client_context, server_context,
3902 chatty=True,
3903 sni_name='supermessage')
3904 # The hostname was fetched properly, and the certificate was
3905 # changed for the connection.
3906 self.assertEqual(calls, [("supermessage", server_context)])
3907 # CERTFILE4 was selected
3908 self.check_common_name(stats, 'fakehostname')
3909
3910 calls = []
3911 # The callback is called with server_name=None
3912 stats = server_params_test(client_context, server_context,
3913 chatty=True,
3914 sni_name=None)
3915 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003916 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003917
3918 # Check disabling the callback
3919 calls = []
3920 server_context.set_servername_callback(None)
3921
3922 stats = server_params_test(client_context, server_context,
3923 chatty=True,
3924 sni_name='notfunny')
3925 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003926 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 self.assertEqual(calls, [])
3928
3929 @needs_sni
3930 def test_sni_callback_alert(self):
3931 # Returning a TLS alert is reflected to the connecting client
3932 server_context, other_context, client_context = self.sni_contexts()
3933
3934 def cb_returning_alert(ssl_sock, server_name, initial_context):
3935 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3936 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003937 with self.assertRaises(ssl.SSLError) as cm:
3938 stats = server_params_test(client_context, server_context,
3939 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003940 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003942
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003943 @needs_sni
3944 def test_sni_callback_raising(self):
3945 # Raising fails the connection with a TLS handshake failure alert.
3946 server_context, other_context, client_context = self.sni_contexts()
3947
3948 def cb_raising(ssl_sock, server_name, initial_context):
3949 1/0
3950 server_context.set_servername_callback(cb_raising)
3951
3952 with self.assertRaises(ssl.SSLError) as cm, \
3953 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003954 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003955 chatty=False,
3956 sni_name='supermessage')
3957 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3958 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003959
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960 @needs_sni
3961 def test_sni_callback_wrong_return_type(self):
3962 # Returning the wrong return type terminates the TLS connection
3963 # with an internal error alert.
3964 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003965
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003966 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3967 return "foo"
3968 server_context.set_servername_callback(cb_wrong_return_type)
3969
3970 with self.assertRaises(ssl.SSLError) as cm, \
3971 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003972 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003973 chatty=False,
3974 sni_name='supermessage')
3975 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3976 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003977
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003978 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003979 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003980 client_context.set_ciphers("AES128:AES256")
3981 server_context.set_ciphers("AES256")
3982 expected_algs = [
3983 "AES256", "AES-256",
3984 # TLS 1.3 ciphers are always enabled
3985 "TLS_CHACHA20", "TLS_AES",
3986 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003987
Christian Heimesa170fa12017-09-15 20:27:30 +02003988 stats = server_params_test(client_context, server_context,
3989 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003990 ciphers = stats['server_shared_ciphers'][0]
3991 self.assertGreater(len(ciphers), 0)
3992 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003993 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003994 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003995
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003996 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003997 client_context, server_context, hostname = testing_context()
3998 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003999
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004000 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004001 s = client_context.wrap_socket(socket.socket(),
4002 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004003 s.connect((HOST, server.port))
4004 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004005
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004006 self.assertRaises(ValueError, s.read, 1024)
4007 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004008
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004009 def test_sendfile(self):
4010 TEST_DATA = b"x" * 512
4011 with open(support.TESTFN, 'wb') as f:
4012 f.write(TEST_DATA)
4013 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004014 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004015 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004016 context.load_verify_locations(SIGNING_CA)
4017 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004018 server = ThreadedEchoServer(context=context, chatty=False)
4019 with server:
4020 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004021 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 with open(support.TESTFN, 'rb') as file:
4023 s.sendfile(file)
4024 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004025
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004026 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004027 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004028 # TODO: sessions aren't compatible with TLSv1.3 yet
4029 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004030
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004031 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004032 stats = server_params_test(client_context, server_context,
4033 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004034 session = stats['session']
4035 self.assertTrue(session.id)
4036 self.assertGreater(session.time, 0)
4037 self.assertGreater(session.timeout, 0)
4038 self.assertTrue(session.has_ticket)
4039 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4040 self.assertGreater(session.ticket_lifetime_hint, 0)
4041 self.assertFalse(stats['session_reused'])
4042 sess_stat = server_context.session_stats()
4043 self.assertEqual(sess_stat['accept'], 1)
4044 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004045
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004046 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004047 stats = server_params_test(client_context, server_context,
4048 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004049 sess_stat = server_context.session_stats()
4050 self.assertEqual(sess_stat['accept'], 2)
4051 self.assertEqual(sess_stat['hits'], 1)
4052 self.assertTrue(stats['session_reused'])
4053 session2 = stats['session']
4054 self.assertEqual(session2.id, session.id)
4055 self.assertEqual(session2, session)
4056 self.assertIsNot(session2, session)
4057 self.assertGreaterEqual(session2.time, session.time)
4058 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004059
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004060 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004061 stats = server_params_test(client_context, server_context,
4062 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 self.assertFalse(stats['session_reused'])
4064 session3 = stats['session']
4065 self.assertNotEqual(session3.id, session.id)
4066 self.assertNotEqual(session3, session)
4067 sess_stat = server_context.session_stats()
4068 self.assertEqual(sess_stat['accept'], 3)
4069 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004070
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004072 stats = server_params_test(client_context, server_context,
4073 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004074 self.assertTrue(stats['session_reused'])
4075 session4 = stats['session']
4076 self.assertEqual(session4.id, session.id)
4077 self.assertEqual(session4, session)
4078 self.assertGreaterEqual(session4.time, session.time)
4079 self.assertGreaterEqual(session4.timeout, session.timeout)
4080 sess_stat = server_context.session_stats()
4081 self.assertEqual(sess_stat['accept'], 4)
4082 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004083
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004084 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004085 client_context, server_context, hostname = testing_context()
4086 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004087
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004088 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004089 client_context.options |= ssl.OP_NO_TLSv1_3
4090 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004091
Christian Heimesa170fa12017-09-15 20:27:30 +02004092 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004094 with client_context.wrap_socket(socket.socket(),
4095 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 # session is None before handshake
4097 self.assertEqual(s.session, None)
4098 self.assertEqual(s.session_reused, None)
4099 s.connect((HOST, server.port))
4100 session = s.session
4101 self.assertTrue(session)
4102 with self.assertRaises(TypeError) as e:
4103 s.session = object
Miss Islington (bot)42198572018-06-11 17:58:06 -07004104 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004105
Christian Heimesa170fa12017-09-15 20:27:30 +02004106 with client_context.wrap_socket(socket.socket(),
4107 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004108 s.connect((HOST, server.port))
4109 # cannot set session after handshake
4110 with self.assertRaises(ValueError) as e:
4111 s.session = session
4112 self.assertEqual(str(e.exception),
4113 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004114
Christian Heimesa170fa12017-09-15 20:27:30 +02004115 with client_context.wrap_socket(socket.socket(),
4116 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004117 # can set session before handshake and before the
4118 # connection was established
4119 s.session = session
4120 s.connect((HOST, server.port))
4121 self.assertEqual(s.session.id, session.id)
4122 self.assertEqual(s.session, session)
4123 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004124
Christian Heimesa170fa12017-09-15 20:27:30 +02004125 with client_context2.wrap_socket(socket.socket(),
4126 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004127 # cannot re-use session with a different SSLContext
4128 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004129 s.session = session
4130 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 self.assertEqual(str(e.exception),
4132 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004133
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004134
Thomas Woutersed03b412007-08-28 21:37:11 +00004135def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004136 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004137 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004138 plats = {
4139 'Linux': platform.linux_distribution,
4140 'Mac': platform.mac_ver,
4141 'Windows': platform.win32_ver,
4142 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004143 with warnings.catch_warnings():
4144 warnings.filterwarnings(
4145 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004146 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004147 'functions are deprecated .*',
Xtreakf5770f32018-07-05 22:29:46 +05304148 DeprecationWarning,
Berker Peksag9e7990a2015-05-16 23:21:26 +03004149 )
4150 for name, func in plats.items():
4151 plat = func()
4152 if plat and plat[0]:
4153 plat = '%s %r' % (name, plat)
4154 break
4155 else:
4156 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004157 print("test_ssl: testing with %r %r" %
4158 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4159 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004160 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004161 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4162 try:
4163 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4164 except AttributeError:
4165 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004166
Antoine Pitrou152efa22010-05-16 18:19:27 +00004167 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004168 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004169 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004170 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004171 BADCERT, BADKEY, EMPTYCERT]:
4172 if not os.path.exists(filename):
4173 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004174
Martin Panter3840b2a2016-03-27 01:53:46 +00004175 tests = [
4176 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004177 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004178 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004179
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004180 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004181 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004182
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004183 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004184 try:
4185 support.run_unittest(*tests)
4186 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004187 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004188
4189if __name__ == "__main__":
4190 test_main()