blob: 4d5925a9355ec0f3edfaf32420915b92f9d1912e [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070064 'notAfter': 'Aug 26 14:23:15 2028 GMT',
65 'notBefore': 'Aug 29 14:23:15 2018 GMT',
66 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010067 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070089 'notAfter': 'Jul 7 14:23:16 2028 GMT',
90 'notBefore': 'Aug 29 14:23:16 2018 GMT',
91 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010092 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000119
Miss Islington (bot)e3228a32018-08-14 10:52:27 -0400120DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100121BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000122
Christian Heimes358cfd42016-09-10 22:43:48 +0200123# Not defined in all versions of OpenSSL
124OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
125OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
126OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
127OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800128OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Thomas Woutersed03b412007-08-28 21:37:11 +0000131def handle_error(prefix):
132 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000133 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000134 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Antoine Pitroub5218772010-05-21 09:56:06 +0000136def can_clear_options():
137 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200138 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000139
140def no_sslv2_implies_sslv3_hello():
141 # 0.9.7h or higher
142 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
143
Christian Heimes2427b502013-11-23 11:24:32 +0100144def have_verify_flags():
145 # 0.9.8 or higher
146 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
147
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800148def _have_secp_curves():
149 if not ssl.HAS_ECDH:
150 return False
151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
152 try:
153 ctx.set_ecdh_curve("secp384r1")
154 except ValueError:
155 return False
156 else:
157 return True
158
159
160HAVE_SECP_CURVES = _have_secp_curves()
161
162
Antoine Pitrouc695c952014-04-28 20:57:36 +0200163def utc_offset(): #NOTE: ignore issues like #1647654
164 # local time = utc time + utc offset
165 if time.daylight and time.localtime().tm_isdst > 0:
166 return -time.altzone # seconds
167 return -time.timezone
168
Christian Heimes9424bb42013-06-17 15:32:57 +0200169def asn1time(cert_time):
170 # Some versions of OpenSSL ignore seconds, see #18207
171 # 0.9.8.i
172 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
173 fmt = "%b %d %H:%M:%S %Y GMT"
174 dt = datetime.datetime.strptime(cert_time, fmt)
175 dt = dt.replace(second=0)
176 cert_time = dt.strftime(fmt)
177 # %d adds leading zero but ASN1_TIME_print() uses leading space
178 if cert_time[4] == "0":
179 cert_time = cert_time[:4] + " " + cert_time[5:]
180
181 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000182
Antoine Pitrou23df4832010-08-04 17:14:06 +0000183# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
184def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200185 if hasattr(ssl, 'PROTOCOL_SSLv2'):
186 @functools.wraps(func)
187 def f(*args, **kwargs):
188 try:
189 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
190 except ssl.SSLError:
191 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
192 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
193 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
194 return func(*args, **kwargs)
195 return f
196 else:
197 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000198
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100199needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
200
Antoine Pitrou23df4832010-08-04 17:14:06 +0000201
Christian Heimesd0486372016-09-10 23:23:33 +0200202def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
203 cert_reqs=ssl.CERT_NONE, ca_certs=None,
204 ciphers=None, certfile=None, keyfile=None,
205 **kwargs):
206 context = ssl.SSLContext(ssl_version)
207 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200208 if cert_reqs == ssl.CERT_NONE:
209 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200210 context.verify_mode = cert_reqs
211 if ca_certs is not None:
212 context.load_verify_locations(ca_certs)
213 if certfile is not None or keyfile is not None:
214 context.load_cert_chain(certfile, keyfile)
215 if ciphers is not None:
216 context.set_ciphers(ciphers)
217 return context.wrap_socket(sock, **kwargs)
218
Christian Heimesa170fa12017-09-15 20:27:30 +0200219
220def testing_context(server_cert=SIGNED_CERTFILE):
221 """Create context
222
223 client_context, server_context, hostname = testing_context()
224 """
225 if server_cert == SIGNED_CERTFILE:
226 hostname = SIGNED_CERTFILE_HOSTNAME
227 elif server_cert == SIGNED_CERTFILE2:
228 hostname = SIGNED_CERTFILE2_HOSTNAME
229 else:
230 raise ValueError(server_cert)
231
232 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
233 client_context.load_verify_locations(SIGNING_CA)
234
235 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
236 server_context.load_cert_chain(server_cert)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800237 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200238
239 return client_context, server_context, hostname
240
241
Antoine Pitrou152efa22010-05-16 18:19:27 +0000242class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000243
Antoine Pitrou480a1242010-04-28 21:37:09 +0000244 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000245 ssl.CERT_NONE
246 ssl.CERT_OPTIONAL
247 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100248 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100249 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100250 if ssl.HAS_ECDH:
251 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100252 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
253 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000254 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100255 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700256 ssl.OP_NO_SSLv2
257 ssl.OP_NO_SSLv3
258 ssl.OP_NO_TLSv1
259 ssl.OP_NO_TLSv1_3
260 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
261 ssl.OP_NO_TLSv1_1
262 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200263 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000264
Christian Heimes89c20512018-02-27 11:17:32 +0100265 def test_private_init(self):
266 with self.assertRaisesRegex(TypeError, "public constructor"):
267 with socket.socket() as s:
268 ssl.SSLSocket(s)
269
Antoine Pitrou172f0252014-04-18 20:33:08 +0200270 def test_str_for_enums(self):
271 # Make sure that the PROTOCOL_* constants have enum-like string
272 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200273 proto = ssl.PROTOCOL_TLS
274 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200275 ctx = ssl.SSLContext(proto)
276 self.assertIs(ctx.protocol, proto)
277
Antoine Pitrou480a1242010-04-28 21:37:09 +0000278 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000280 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 sys.stdout.write("\n RAND_status is %d (%s)\n"
282 % (v, (v and "sufficient randomness") or
283 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200284
285 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
286 self.assertEqual(len(data), 16)
287 self.assertEqual(is_cryptographic, v == 1)
288 if v:
289 data = ssl.RAND_bytes(16)
290 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200291 else:
292 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200293
Victor Stinner1e81a392013-12-19 16:47:04 +0100294 # negative num is invalid
295 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
296 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
297
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100298 if hasattr(ssl, 'RAND_egd'):
299 self.assertRaises(TypeError, ssl.RAND_egd, 1)
300 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000301 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200302 ssl.RAND_add(b"this is a random bytes object", 75.0)
303 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000304
Christian Heimesf77b4b22013-08-21 13:26:05 +0200305 @unittest.skipUnless(os.name == 'posix', 'requires posix')
306 def test_random_fork(self):
307 status = ssl.RAND_status()
308 if not status:
309 self.fail("OpenSSL's PRNG has insufficient randomness")
310
311 rfd, wfd = os.pipe()
312 pid = os.fork()
313 if pid == 0:
314 try:
315 os.close(rfd)
316 child_random = ssl.RAND_pseudo_bytes(16)[0]
317 self.assertEqual(len(child_random), 16)
318 os.write(wfd, child_random)
319 os.close(wfd)
320 except BaseException:
321 os._exit(1)
322 else:
323 os._exit(0)
324 else:
325 os.close(wfd)
326 self.addCleanup(os.close, rfd)
327 _, status = os.waitpid(pid, 0)
328 self.assertEqual(status, 0)
329
330 child_random = os.read(rfd, 16)
331 self.assertEqual(len(child_random), 16)
332 parent_random = ssl.RAND_pseudo_bytes(16)[0]
333 self.assertEqual(len(parent_random), 16)
334
335 self.assertNotEqual(child_random, parent_random)
336
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -0700337 maxDiff = None
338
Antoine Pitrou480a1242010-04-28 21:37:09 +0000339 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000340 # note that this uses an 'unofficial' function in _ssl.c,
341 # provided solely for this test, to exercise the certificate
342 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100343 self.assertEqual(
344 ssl._ssl._test_decode_cert(CERTFILE),
345 CERTFILE_INFO
346 )
347 self.assertEqual(
348 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
349 SIGNED_CERTFILE_INFO
350 )
351
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200352 # Issue #13034: the subjectAltName in some certificates
353 # (notably projects.developer.nokia.com:443) wasn't parsed
354 p = ssl._ssl._test_decode_cert(NOKIACERT)
355 if support.verbose:
356 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
357 self.assertEqual(p['subjectAltName'],
358 (('DNS', 'projects.developer.nokia.com'),
359 ('DNS', 'projects.forum.nokia.com'))
360 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100361 # extra OCSP and AIA fields
362 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
363 self.assertEqual(p['caIssuers'],
364 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
365 self.assertEqual(p['crlDistributionPoints'],
366 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000367
Christian Heimes824f7f32013-08-17 00:54:47 +0200368 def test_parse_cert_CVE_2013_4238(self):
369 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
370 if support.verbose:
371 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
372 subject = ((('countryName', 'US'),),
373 (('stateOrProvinceName', 'Oregon'),),
374 (('localityName', 'Beaverton'),),
375 (('organizationName', 'Python Software Foundation'),),
376 (('organizationalUnitName', 'Python Core Development'),),
377 (('commonName', 'null.python.org\x00example.org'),),
378 (('emailAddress', 'python-dev@python.org'),))
379 self.assertEqual(p['subject'], subject)
380 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200381 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
382 san = (('DNS', 'altnull.python.org\x00example.com'),
383 ('email', 'null@python.org\x00user@example.org'),
384 ('URI', 'http://null.python.org\x00http://example.org'),
385 ('IP Address', '192.0.2.1'),
386 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
387 else:
388 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
389 san = (('DNS', 'altnull.python.org\x00example.com'),
390 ('email', 'null@python.org\x00user@example.org'),
391 ('URI', 'http://null.python.org\x00http://example.org'),
392 ('IP Address', '192.0.2.1'),
393 ('IP Address', '<invalid>'))
394
395 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200396
Christian Heimes1c03abd2016-09-06 23:25:35 +0200397 def test_parse_all_sans(self):
398 p = ssl._ssl._test_decode_cert(ALLSANFILE)
399 self.assertEqual(p['subjectAltName'],
400 (
401 ('DNS', 'allsans'),
402 ('othername', '<unsupported>'),
403 ('othername', '<unsupported>'),
404 ('email', 'user@example.org'),
405 ('DNS', 'www.example.org'),
406 ('DirName',
407 ((('countryName', 'XY'),),
408 (('localityName', 'Castle Anthrax'),),
409 (('organizationName', 'Python Software Foundation'),),
410 (('commonName', 'dirname example'),))),
411 ('URI', 'https://www.python.org/'),
412 ('IP Address', '127.0.0.1'),
413 ('IP Address', '0:0:0:0:0:0:0:1\n'),
414 ('Registered ID', '1.2.3.4.5')
415 )
416 )
417
Antoine Pitrou480a1242010-04-28 21:37:09 +0000418 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000419 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000420 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000421 d1 = ssl.PEM_cert_to_DER_cert(pem)
422 p2 = ssl.DER_cert_to_PEM_cert(d1)
423 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000424 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000425 if not p2.startswith(ssl.PEM_HEADER + '\n'):
426 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
427 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
428 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000429
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000430 def test_openssl_version(self):
431 n = ssl.OPENSSL_VERSION_NUMBER
432 t = ssl.OPENSSL_VERSION_INFO
433 s = ssl.OPENSSL_VERSION
434 self.assertIsInstance(n, int)
435 self.assertIsInstance(t, tuple)
436 self.assertIsInstance(s, str)
437 # Some sanity checks follow
438 # >= 0.9
439 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400440 # < 3.0
441 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000442 major, minor, fix, patch, status = t
443 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400444 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000445 self.assertGreaterEqual(minor, 0)
446 self.assertLess(minor, 256)
447 self.assertGreaterEqual(fix, 0)
448 self.assertLess(fix, 256)
449 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100450 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000451 self.assertGreaterEqual(status, 0)
452 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400453 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200454 if IS_LIBRESSL:
455 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100456 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400457 else:
458 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100459 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000460
Antoine Pitrou9d543662010-04-23 23:10:32 +0000461 @support.cpython_only
462 def test_refcycle(self):
463 # Issue #7943: an SSL object doesn't create reference cycles with
464 # itself.
465 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200466 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000467 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100468 with support.check_warnings(("", ResourceWarning)):
469 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100470 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000471
Antoine Pitroua468adc2010-09-14 14:43:44 +0000472 def test_wrapped_unconnected(self):
473 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200474 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000475 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200476 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100477 self.assertRaises(OSError, ss.recv, 1)
478 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
479 self.assertRaises(OSError, ss.recvfrom, 1)
480 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
481 self.assertRaises(OSError, ss.send, b'x')
482 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800483 self.assertRaises(NotImplementedError, ss.sendmsg,
484 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000485
Antoine Pitrou40f08742010-04-24 22:04:40 +0000486 def test_timeout(self):
487 # Issue #8524: when creating an SSL socket, the timeout of the
488 # original socket should be retained.
489 for timeout in (None, 0.0, 5.0):
490 s = socket.socket(socket.AF_INET)
491 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200492 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100493 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000494
Christian Heimesd0486372016-09-10 23:23:33 +0200495 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000496 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000497 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000498 "certfile must be specified",
499 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000500 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000501 "certfile must be specified for server-side operations",
502 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000503 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000504 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200505 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100506 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
507 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200508 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200509 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000510 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000511 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000512 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200513 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000514 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000515 ssl.wrap_socket(sock,
516 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000517 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200518 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000519 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000520 ssl.wrap_socket(sock,
521 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000522 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000523
Martin Panter3464ea22016-02-01 21:58:11 +0000524 def bad_cert_test(self, certfile):
525 """Check that trying to use the given client certificate fails"""
526 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
527 certfile)
528 sock = socket.socket()
529 self.addCleanup(sock.close)
530 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200531 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200532 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000533
534 def test_empty_cert(self):
535 """Wrapping with an empty cert file"""
536 self.bad_cert_test("nullcert.pem")
537
538 def test_malformed_cert(self):
539 """Wrapping with a badly formatted certificate (syntax error)"""
540 self.bad_cert_test("badcert.pem")
541
542 def test_malformed_key(self):
543 """Wrapping with a badly formatted key (syntax error)"""
544 self.bad_cert_test("badkey.pem")
545
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000546 def test_match_hostname(self):
547 def ok(cert, hostname):
548 ssl.match_hostname(cert, hostname)
549 def fail(cert, hostname):
550 self.assertRaises(ssl.CertificateError,
551 ssl.match_hostname, cert, hostname)
552
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100553 # -- Hostname matching --
554
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 cert = {'subject': ((('commonName', 'example.com'),),)}
556 ok(cert, 'example.com')
557 ok(cert, 'ExAmple.cOm')
558 fail(cert, 'www.example.com')
559 fail(cert, '.example.com')
560 fail(cert, 'example.org')
561 fail(cert, 'exampleXcom')
562
563 cert = {'subject': ((('commonName', '*.a.com'),),)}
564 ok(cert, 'foo.a.com')
565 fail(cert, 'bar.foo.a.com')
566 fail(cert, 'a.com')
567 fail(cert, 'Xa.com')
568 fail(cert, '.a.com')
569
Mandeep Singhede2ac92017-11-27 04:01:27 +0530570 # only match wildcards when they are the only thing
571 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000572 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530573 fail(cert, 'foo.com')
574 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000575 fail(cert, 'bar.com')
576 fail(cert, 'foo.a.com')
577 fail(cert, 'bar.foo.com')
578
Christian Heimes824f7f32013-08-17 00:54:47 +0200579 # NULL bytes are bad, CVE-2013-4073
580 cert = {'subject': ((('commonName',
581 'null.python.org\x00example.org'),),)}
582 ok(cert, 'null.python.org\x00example.org') # or raise an error?
583 fail(cert, 'example.org')
584 fail(cert, 'null.python.org')
585
Georg Brandl72c98d32013-10-27 07:16:53 +0100586 # error cases with wildcards
587 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
588 fail(cert, 'bar.foo.a.com')
589 fail(cert, 'a.com')
590 fail(cert, 'Xa.com')
591 fail(cert, '.a.com')
592
593 cert = {'subject': ((('commonName', 'a.*.com'),),)}
594 fail(cert, 'a.foo.com')
595 fail(cert, 'a..com')
596 fail(cert, 'a.com')
597
598 # wildcard doesn't match IDNA prefix 'xn--'
599 idna = 'püthon.python.org'.encode("idna").decode("ascii")
600 cert = {'subject': ((('commonName', idna),),)}
601 ok(cert, idna)
602 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
603 fail(cert, idna)
604 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
605 fail(cert, idna)
606
607 # wildcard in first fragment and IDNA A-labels in sequent fragments
608 # are supported.
609 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
610 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530611 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
612 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100613 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
614 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
615
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000616 # Slightly fake real-world example
617 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
618 'subject': ((('commonName', 'linuxfrz.org'),),),
619 'subjectAltName': (('DNS', 'linuxfr.org'),
620 ('DNS', 'linuxfr.com'),
621 ('othername', '<unsupported>'))}
622 ok(cert, 'linuxfr.org')
623 ok(cert, 'linuxfr.com')
624 # Not a "DNS" entry
625 fail(cert, '<unsupported>')
626 # When there is a subjectAltName, commonName isn't used
627 fail(cert, 'linuxfrz.org')
628
629 # A pristine real-world example
630 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
631 'subject': ((('countryName', 'US'),),
632 (('stateOrProvinceName', 'California'),),
633 (('localityName', 'Mountain View'),),
634 (('organizationName', 'Google Inc'),),
635 (('commonName', 'mail.google.com'),))}
636 ok(cert, 'mail.google.com')
637 fail(cert, 'gmail.com')
638 # Only commonName is considered
639 fail(cert, 'California')
640
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100641 # -- IPv4 matching --
642 cert = {'subject': ((('commonName', 'example.com'),),),
643 'subjectAltName': (('DNS', 'example.com'),
644 ('IP Address', '10.11.12.13'),
645 ('IP Address', '14.15.16.17'))}
646 ok(cert, '10.11.12.13')
647 ok(cert, '14.15.16.17')
648 fail(cert, '14.15.16.18')
649 fail(cert, 'example.net')
650
651 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800652 if hasattr(socket, 'AF_INET6'):
653 cert = {'subject': ((('commonName', 'example.com'),),),
654 'subjectAltName': (
655 ('DNS', 'example.com'),
656 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
657 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
658 ok(cert, '2001::cafe')
659 ok(cert, '2003::baba')
660 fail(cert, '2003::bebe')
661 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100662
663 # -- Miscellaneous --
664
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000665 # Neither commonName nor subjectAltName
666 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
667 'subject': ((('countryName', 'US'),),
668 (('stateOrProvinceName', 'California'),),
669 (('localityName', 'Mountain View'),),
670 (('organizationName', 'Google Inc'),))}
671 fail(cert, 'mail.google.com')
672
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200673 # No DNS entry in subjectAltName but a commonName
674 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
675 'subject': ((('countryName', 'US'),),
676 (('stateOrProvinceName', 'California'),),
677 (('localityName', 'Mountain View'),),
678 (('commonName', 'mail.google.com'),)),
679 'subjectAltName': (('othername', 'blabla'), )}
680 ok(cert, 'mail.google.com')
681
682 # No DNS entry subjectAltName and no commonName
683 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
684 'subject': ((('countryName', 'US'),),
685 (('stateOrProvinceName', 'California'),),
686 (('localityName', 'Mountain View'),),
687 (('organizationName', 'Google Inc'),)),
688 'subjectAltName': (('othername', 'blabla'),)}
689 fail(cert, 'google.com')
690
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000691 # Empty cert / no cert
692 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
693 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
694
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200695 # Issue #17980: avoid denials of service by refusing more than one
696 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800697 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
698 with self.assertRaisesRegex(
699 ssl.CertificateError,
700 "partial wildcards in leftmost label are not supported"):
701 ssl.match_hostname(cert, 'axxb.example.com')
702
703 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
704 with self.assertRaisesRegex(
705 ssl.CertificateError,
706 "wildcard can only be present in the leftmost label"):
707 ssl.match_hostname(cert, 'www.sub.example.com')
708
709 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
710 with self.assertRaisesRegex(
711 ssl.CertificateError,
712 "too many wildcards"):
713 ssl.match_hostname(cert, 'axxbxxc.example.com')
714
715 cert = {'subject': ((('commonName', '*'),),)}
716 with self.assertRaisesRegex(
717 ssl.CertificateError,
718 "sole wildcard without additional labels are not support"):
719 ssl.match_hostname(cert, 'host')
720
721 cert = {'subject': ((('commonName', '*.com'),),)}
722 with self.assertRaisesRegex(
723 ssl.CertificateError,
724 r"hostname 'com' doesn't match '\*.com'"):
725 ssl.match_hostname(cert, 'com')
726
727 # extra checks for _inet_paton()
728 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
729 with self.assertRaises(ValueError):
730 ssl._inet_paton(invalid)
731 for ipaddr in ['127.0.0.1', '192.168.0.1']:
732 self.assertTrue(ssl._inet_paton(ipaddr))
733 if hasattr(socket, 'AF_INET6'):
734 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
735 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200736
Antoine Pitroud5323212010-10-22 18:19:07 +0000737 def test_server_side(self):
738 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200739 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000740 with socket.socket() as sock:
741 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
742 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000743
Antoine Pitroud6494802011-07-21 01:11:30 +0200744 def test_unknown_channel_binding(self):
745 # should raise ValueError for unknown type
746 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200747 s.bind(('127.0.0.1', 0))
748 s.listen()
749 c = socket.socket(socket.AF_INET)
750 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200751 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100752 with self.assertRaises(ValueError):
753 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200754 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200755
756 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
757 "'tls-unique' channel binding not available")
758 def test_tls_unique_channel_binding(self):
759 # unconnected should return None for known type
760 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200761 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100762 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200763 # the same for server-side
764 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200765 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100766 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200767
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600768 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200769 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600770 r = repr(ss)
771 with self.assertWarns(ResourceWarning) as cm:
772 ss = None
773 support.gc_collect()
774 self.assertIn(r, str(cm.warning.args[0]))
775
Christian Heimes6d7ad132013-06-09 18:02:55 +0200776 def test_get_default_verify_paths(self):
777 paths = ssl.get_default_verify_paths()
778 self.assertEqual(len(paths), 6)
779 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
780
781 with support.EnvironmentVarGuard() as env:
782 env["SSL_CERT_DIR"] = CAPATH
783 env["SSL_CERT_FILE"] = CERTFILE
784 paths = ssl.get_default_verify_paths()
785 self.assertEqual(paths.cafile, CERTFILE)
786 self.assertEqual(paths.capath, CAPATH)
787
Christian Heimes44109d72013-11-22 01:51:30 +0100788 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
789 def test_enum_certificates(self):
790 self.assertTrue(ssl.enum_certificates("CA"))
791 self.assertTrue(ssl.enum_certificates("ROOT"))
792
793 self.assertRaises(TypeError, ssl.enum_certificates)
794 self.assertRaises(WindowsError, ssl.enum_certificates, "")
795
Christian Heimesc2d65e12013-11-22 16:13:55 +0100796 trust_oids = set()
797 for storename in ("CA", "ROOT"):
798 store = ssl.enum_certificates(storename)
799 self.assertIsInstance(store, list)
800 for element in store:
801 self.assertIsInstance(element, tuple)
802 self.assertEqual(len(element), 3)
803 cert, enc, trust = element
804 self.assertIsInstance(cert, bytes)
805 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
806 self.assertIsInstance(trust, (set, bool))
807 if isinstance(trust, set):
808 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100809
810 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100811 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200812
Christian Heimes46bebee2013-06-09 19:03:31 +0200813 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100814 def test_enum_crls(self):
815 self.assertTrue(ssl.enum_crls("CA"))
816 self.assertRaises(TypeError, ssl.enum_crls)
817 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200818
Christian Heimes44109d72013-11-22 01:51:30 +0100819 crls = ssl.enum_crls("CA")
820 self.assertIsInstance(crls, list)
821 for element in crls:
822 self.assertIsInstance(element, tuple)
823 self.assertEqual(len(element), 2)
824 self.assertIsInstance(element[0], bytes)
825 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200826
Christian Heimes46bebee2013-06-09 19:03:31 +0200827
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100828 def test_asn1object(self):
829 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
830 '1.3.6.1.5.5.7.3.1')
831
832 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
833 self.assertEqual(val, expected)
834 self.assertEqual(val.nid, 129)
835 self.assertEqual(val.shortname, 'serverAuth')
836 self.assertEqual(val.longname, 'TLS Web Server Authentication')
837 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
838 self.assertIsInstance(val, ssl._ASN1Object)
839 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
840
841 val = ssl._ASN1Object.fromnid(129)
842 self.assertEqual(val, expected)
843 self.assertIsInstance(val, ssl._ASN1Object)
844 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100845 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
846 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100847 for i in range(1000):
848 try:
849 obj = ssl._ASN1Object.fromnid(i)
850 except ValueError:
851 pass
852 else:
853 self.assertIsInstance(obj.nid, int)
854 self.assertIsInstance(obj.shortname, str)
855 self.assertIsInstance(obj.longname, str)
856 self.assertIsInstance(obj.oid, (str, type(None)))
857
858 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
859 self.assertEqual(val, expected)
860 self.assertIsInstance(val, ssl._ASN1Object)
861 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
862 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
863 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100864 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
865 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100866
Christian Heimes72d28502013-11-23 13:56:58 +0100867 def test_purpose_enum(self):
868 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
869 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
872 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
873 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
874 '1.3.6.1.5.5.7.3.1')
875
876 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
877 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
880 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
881 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
882 '1.3.6.1.5.5.7.3.2')
883
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100884 def test_unsupported_dtls(self):
885 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
886 self.addCleanup(s.close)
887 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200888 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100889 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200890 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100891 with self.assertRaises(NotImplementedError) as cx:
892 ctx.wrap_socket(s)
893 self.assertEqual(str(cx.exception), "only stream sockets are supported")
894
Antoine Pitrouc695c952014-04-28 20:57:36 +0200895 def cert_time_ok(self, timestring, timestamp):
896 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
897
898 def cert_time_fail(self, timestring):
899 with self.assertRaises(ValueError):
900 ssl.cert_time_to_seconds(timestring)
901
902 @unittest.skipUnless(utc_offset(),
903 'local time needs to be different from UTC')
904 def test_cert_time_to_seconds_timezone(self):
905 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
906 # results if local timezone is not UTC
907 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
908 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
909
910 def test_cert_time_to_seconds(self):
911 timestring = "Jan 5 09:34:43 2018 GMT"
912 ts = 1515144883.0
913 self.cert_time_ok(timestring, ts)
914 # accept keyword parameter, assert its name
915 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
916 # accept both %e and %d (space or zero generated by strftime)
917 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
918 # case-insensitive
919 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
920 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
921 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
922 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
923 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
924 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
925 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
926 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
927
928 newyear_ts = 1230768000.0
929 # leap seconds
930 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
931 # same timestamp
932 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
933
934 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
935 # allow 60th second (even if it is not a leap second)
936 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
937 # allow 2nd leap second for compatibility with time.strptime()
938 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
939 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
940
Mike53f7a7c2017-12-14 14:04:53 +0300941 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200942 # 99991231235959Z (rfc 5280)
943 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
944
945 @support.run_with_locale('LC_ALL', '')
946 def test_cert_time_to_seconds_locale(self):
947 # `cert_time_to_seconds()` should be locale independent
948
949 def local_february_name():
950 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
951
952 if local_february_name().lower() == 'feb':
953 self.skipTest("locale-specific month name needs to be "
954 "different from C locale")
955
956 # locale-independent
957 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
958 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
959
Martin Panter3840b2a2016-03-27 01:53:46 +0000960 def test_connect_ex_error(self):
961 server = socket.socket(socket.AF_INET)
962 self.addCleanup(server.close)
963 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200964 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000965 cert_reqs=ssl.CERT_REQUIRED)
966 self.addCleanup(s.close)
967 rc = s.connect_ex((HOST, port))
968 # Issue #19919: Windows machines or VMs hosted on Windows
969 # machines sometimes return EWOULDBLOCK.
970 errors = (
971 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
972 errno.EWOULDBLOCK,
973 )
974 self.assertIn(rc, errors)
975
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100976
Antoine Pitrou152efa22010-05-16 18:19:27 +0000977class ContextTests(unittest.TestCase):
978
Antoine Pitrou23df4832010-08-04 17:14:06 +0000979 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000980 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100981 for protocol in PROTOCOLS:
982 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200983 ctx = ssl.SSLContext()
984 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000985 self.assertRaises(ValueError, ssl.SSLContext, -1)
986 self.assertRaises(ValueError, ssl.SSLContext, 42)
987
Antoine Pitrou23df4832010-08-04 17:14:06 +0000988 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000989 def test_protocol(self):
990 for proto in PROTOCOLS:
991 ctx = ssl.SSLContext(proto)
992 self.assertEqual(ctx.protocol, proto)
993
994 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200995 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000996 ctx.set_ciphers("ALL")
997 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000998 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000999 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001000
Christian Heimes892d66e2018-01-29 14:10:18 +01001001 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1002 "Test applies only to Python default ciphers")
1003 def test_python_ciphers(self):
1004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1005 ciphers = ctx.get_ciphers()
1006 for suite in ciphers:
1007 name = suite['name']
1008 self.assertNotIn("PSK", name)
1009 self.assertNotIn("SRP", name)
1010 self.assertNotIn("MD5", name)
1011 self.assertNotIn("RC4", name)
1012 self.assertNotIn("3DES", name)
1013
Christian Heimes25bfcd52016-09-06 00:04:45 +02001014 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1015 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001016 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001017 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001018 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001019 self.assertIn('AES256-GCM-SHA384', names)
1020 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001021
Antoine Pitrou23df4832010-08-04 17:14:06 +00001022 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001023 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001025 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001026 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001027 # SSLContext also enables these by default
1028 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001029 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1030 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001031 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001032 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001033 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001034 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001035 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1036 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001037 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001038 # Ubuntu has OP_NO_SSLv3 forced on by default
1039 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001040 else:
1041 with self.assertRaises(ValueError):
1042 ctx.options = 0
1043
Christian Heimesa170fa12017-09-15 20:27:30 +02001044 def test_verify_mode_protocol(self):
1045 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001046 # Default value
1047 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1048 ctx.verify_mode = ssl.CERT_OPTIONAL
1049 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1050 ctx.verify_mode = ssl.CERT_REQUIRED
1051 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1052 ctx.verify_mode = ssl.CERT_NONE
1053 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1054 with self.assertRaises(TypeError):
1055 ctx.verify_mode = None
1056 with self.assertRaises(ValueError):
1057 ctx.verify_mode = 42
1058
Christian Heimesa170fa12017-09-15 20:27:30 +02001059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1060 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1061 self.assertFalse(ctx.check_hostname)
1062
1063 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1064 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1065 self.assertTrue(ctx.check_hostname)
1066
Christian Heimes61d478c2018-01-27 15:51:38 +01001067 def test_hostname_checks_common_name(self):
1068 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1069 self.assertTrue(ctx.hostname_checks_common_name)
1070 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1071 ctx.hostname_checks_common_name = True
1072 self.assertTrue(ctx.hostname_checks_common_name)
1073 ctx.hostname_checks_common_name = False
1074 self.assertFalse(ctx.hostname_checks_common_name)
1075 ctx.hostname_checks_common_name = True
1076 self.assertTrue(ctx.hostname_checks_common_name)
1077 else:
1078 with self.assertRaises(AttributeError):
1079 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001080
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001081 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1082 "required OpenSSL 1.1.0g")
1083 def test_min_max_version(self):
1084 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1085 self.assertEqual(
1086 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1087 )
1088 self.assertEqual(
1089 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1090 )
1091
1092 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1093 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1094 self.assertEqual(
1095 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1096 )
1097 self.assertEqual(
1098 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1099 )
1100
1101 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1102 ctx.maximum_version = ssl.TLSVersion.TLSv1
1103 self.assertEqual(
1104 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1105 )
1106 self.assertEqual(
1107 ctx.maximum_version, ssl.TLSVersion.TLSv1
1108 )
1109
1110 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1111 self.assertEqual(
1112 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1113 )
1114
1115 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1116 self.assertIn(
1117 ctx.maximum_version,
1118 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1119 )
1120
1121 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1122 self.assertIn(
1123 ctx.minimum_version,
1124 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1125 )
1126
1127 with self.assertRaises(ValueError):
1128 ctx.minimum_version = 42
1129
1130 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1131
1132 self.assertEqual(
1133 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1134 )
1135 self.assertEqual(
1136 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 )
1138 with self.assertRaises(ValueError):
1139 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1140 with self.assertRaises(ValueError):
1141 ctx.maximum_version = ssl.TLSVersion.TLSv1
1142
1143
Christian Heimes2427b502013-11-23 11:24:32 +01001144 @unittest.skipUnless(have_verify_flags(),
1145 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001146 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001147 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001148 # default value
1149 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1150 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001151 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1152 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1153 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1154 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1155 ctx.verify_flags = ssl.VERIFY_DEFAULT
1156 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1157 # supports any value
1158 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1159 self.assertEqual(ctx.verify_flags,
1160 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1161 with self.assertRaises(TypeError):
1162 ctx.verify_flags = None
1163
Antoine Pitrou152efa22010-05-16 18:19:27 +00001164 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001165 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001166 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001167 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001168 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1169 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001170 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001171 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001172 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001173 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001174 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001175 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001176 ctx.load_cert_chain(EMPTYCERT)
1177 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001178 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001179 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1180 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1181 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001182 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001183 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001184 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001185 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001186 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001187 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1188 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001189 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001190 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001191 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001192 # Password protected key and cert
1193 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1194 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1195 ctx.load_cert_chain(CERTFILE_PROTECTED,
1196 password=bytearray(KEY_PASSWORD.encode()))
1197 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1198 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1199 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1200 bytearray(KEY_PASSWORD.encode()))
1201 with self.assertRaisesRegex(TypeError, "should be a string"):
1202 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1203 with self.assertRaises(ssl.SSLError):
1204 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1205 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1206 # openssl has a fixed limit on the password buffer.
1207 # PEM_BUFSIZE is generally set to 1kb.
1208 # Return a string larger than this.
1209 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1210 # Password callback
1211 def getpass_unicode():
1212 return KEY_PASSWORD
1213 def getpass_bytes():
1214 return KEY_PASSWORD.encode()
1215 def getpass_bytearray():
1216 return bytearray(KEY_PASSWORD.encode())
1217 def getpass_badpass():
1218 return "badpass"
1219 def getpass_huge():
1220 return b'a' * (1024 * 1024)
1221 def getpass_bad_type():
1222 return 9
1223 def getpass_exception():
1224 raise Exception('getpass error')
1225 class GetPassCallable:
1226 def __call__(self):
1227 return KEY_PASSWORD
1228 def getpass(self):
1229 return KEY_PASSWORD
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1232 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1233 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1234 ctx.load_cert_chain(CERTFILE_PROTECTED,
1235 password=GetPassCallable().getpass)
1236 with self.assertRaises(ssl.SSLError):
1237 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1238 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1239 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1240 with self.assertRaisesRegex(TypeError, "must return a string"):
1241 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1242 with self.assertRaisesRegex(Exception, "getpass error"):
1243 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1244 # Make sure the password function isn't called if it isn't needed
1245 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001246
1247 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001248 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001249 ctx.load_verify_locations(CERTFILE)
1250 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1251 ctx.load_verify_locations(BYTES_CERTFILE)
1252 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1253 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001254 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001255 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001256 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001257 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001258 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001259 ctx.load_verify_locations(BADCERT)
1260 ctx.load_verify_locations(CERTFILE, CAPATH)
1261 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1262
Victor Stinner80f75e62011-01-29 11:31:20 +00001263 # Issue #10989: crash if the second argument type is invalid
1264 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1265
Christian Heimesefff7062013-11-21 03:35:02 +01001266 def test_load_verify_cadata(self):
1267 # test cadata
1268 with open(CAFILE_CACERT) as f:
1269 cacert_pem = f.read()
1270 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1271 with open(CAFILE_NEURONIO) as f:
1272 neuronio_pem = f.read()
1273 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1274
1275 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001276 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001277 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1278 ctx.load_verify_locations(cadata=cacert_pem)
1279 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1280 ctx.load_verify_locations(cadata=neuronio_pem)
1281 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1282 # cert already in hash table
1283 ctx.load_verify_locations(cadata=neuronio_pem)
1284 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1285
1286 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001287 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001288 combined = "\n".join((cacert_pem, neuronio_pem))
1289 ctx.load_verify_locations(cadata=combined)
1290 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1291
1292 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001293 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001294 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1295 neuronio_pem, "tail"]
1296 ctx.load_verify_locations(cadata="\n".join(combined))
1297 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1298
1299 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001300 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001301 ctx.load_verify_locations(cadata=cacert_der)
1302 ctx.load_verify_locations(cadata=neuronio_der)
1303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1304 # cert already in hash table
1305 ctx.load_verify_locations(cadata=cacert_der)
1306 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1307
1308 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001310 combined = b"".join((cacert_der, neuronio_der))
1311 ctx.load_verify_locations(cadata=combined)
1312 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1313
1314 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001315 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001316 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1317
1318 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1319 ctx.load_verify_locations(cadata="broken")
1320 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1321 ctx.load_verify_locations(cadata=b"broken")
1322
1323
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001324 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001325 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001326 ctx.load_dh_params(DHFILE)
1327 if os.name != 'nt':
1328 ctx.load_dh_params(BYTES_DHFILE)
1329 self.assertRaises(TypeError, ctx.load_dh_params)
1330 self.assertRaises(TypeError, ctx.load_dh_params, None)
1331 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001332 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001333 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001334 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001335 ctx.load_dh_params(CERTFILE)
1336
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001337 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001338 def test_session_stats(self):
1339 for proto in PROTOCOLS:
1340 ctx = ssl.SSLContext(proto)
1341 self.assertEqual(ctx.session_stats(), {
1342 'number': 0,
1343 'connect': 0,
1344 'connect_good': 0,
1345 'connect_renegotiate': 0,
1346 'accept': 0,
1347 'accept_good': 0,
1348 'accept_renegotiate': 0,
1349 'hits': 0,
1350 'misses': 0,
1351 'timeouts': 0,
1352 'cache_full': 0,
1353 })
1354
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001355 def test_set_default_verify_paths(self):
1356 # There's not much we can do to test that it acts as expected,
1357 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001358 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001359 ctx.set_default_verify_paths()
1360
Antoine Pitrou501da612011-12-21 09:27:41 +01001361 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001362 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001363 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001364 ctx.set_ecdh_curve("prime256v1")
1365 ctx.set_ecdh_curve(b"prime256v1")
1366 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1367 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1368 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1369 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1370
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001371 @needs_sni
1372 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001373 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001374
1375 # set_servername_callback expects a callable, or None
1376 self.assertRaises(TypeError, ctx.set_servername_callback)
1377 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1378 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1379 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1380
1381 def dummycallback(sock, servername, ctx):
1382 pass
1383 ctx.set_servername_callback(None)
1384 ctx.set_servername_callback(dummycallback)
1385
1386 @needs_sni
1387 def test_sni_callback_refcycle(self):
1388 # Reference cycles through the servername callback are detected
1389 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001390 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001391 def dummycallback(sock, servername, ctx, cycle=ctx):
1392 pass
1393 ctx.set_servername_callback(dummycallback)
1394 wr = weakref.ref(ctx)
1395 del ctx, dummycallback
1396 gc.collect()
1397 self.assertIs(wr(), None)
1398
Christian Heimes9a5395a2013-06-17 15:44:12 +02001399 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001400 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001401 self.assertEqual(ctx.cert_store_stats(),
1402 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1403 ctx.load_cert_chain(CERTFILE)
1404 self.assertEqual(ctx.cert_store_stats(),
1405 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1406 ctx.load_verify_locations(CERTFILE)
1407 self.assertEqual(ctx.cert_store_stats(),
1408 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001409 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001410 self.assertEqual(ctx.cert_store_stats(),
1411 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1412
1413 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001414 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001415 self.assertEqual(ctx.get_ca_certs(), [])
1416 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1417 ctx.load_verify_locations(CERTFILE)
1418 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001419 # but CAFILE_CACERT is a CA cert
1420 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001421 self.assertEqual(ctx.get_ca_certs(),
1422 [{'issuer': ((('organizationName', 'Root CA'),),
1423 (('organizationalUnitName', 'http://www.cacert.org'),),
1424 (('commonName', 'CA Cert Signing Authority'),),
1425 (('emailAddress', 'support@cacert.org'),)),
1426 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1427 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1428 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001429 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001430 'subject': ((('organizationName', 'Root CA'),),
1431 (('organizationalUnitName', 'http://www.cacert.org'),),
1432 (('commonName', 'CA Cert Signing Authority'),),
1433 (('emailAddress', 'support@cacert.org'),)),
1434 'version': 3}])
1435
Martin Panterb55f8b72016-01-14 12:53:56 +00001436 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001437 pem = f.read()
1438 der = ssl.PEM_cert_to_DER_cert(pem)
1439 self.assertEqual(ctx.get_ca_certs(True), [der])
1440
Christian Heimes72d28502013-11-23 13:56:58 +01001441 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001442 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001443 ctx.load_default_certs()
1444
Christian Heimesa170fa12017-09-15 20:27:30 +02001445 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001446 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1447 ctx.load_default_certs()
1448
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001450 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1451
Christian Heimesa170fa12017-09-15 20:27:30 +02001452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001453 self.assertRaises(TypeError, ctx.load_default_certs, None)
1454 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1455
Benjamin Peterson91244e02014-10-03 18:17:15 -04001456 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001457 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001458 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001459 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001460 with support.EnvironmentVarGuard() as env:
1461 env["SSL_CERT_DIR"] = CAPATH
1462 env["SSL_CERT_FILE"] = CERTFILE
1463 ctx.load_default_certs()
1464 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1465
Benjamin Peterson91244e02014-10-03 18:17:15 -04001466 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001467 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001468 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001469 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001470 ctx.load_default_certs()
1471 stats = ctx.cert_store_stats()
1472
Christian Heimesa170fa12017-09-15 20:27:30 +02001473 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001474 with support.EnvironmentVarGuard() as env:
1475 env["SSL_CERT_DIR"] = CAPATH
1476 env["SSL_CERT_FILE"] = CERTFILE
1477 ctx.load_default_certs()
1478 stats["x509"] += 1
1479 self.assertEqual(ctx.cert_store_stats(), stats)
1480
Christian Heimes358cfd42016-09-10 22:43:48 +02001481 def _assert_context_options(self, ctx):
1482 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1483 if OP_NO_COMPRESSION != 0:
1484 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1485 OP_NO_COMPRESSION)
1486 if OP_SINGLE_DH_USE != 0:
1487 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1488 OP_SINGLE_DH_USE)
1489 if OP_SINGLE_ECDH_USE != 0:
1490 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1491 OP_SINGLE_ECDH_USE)
1492 if OP_CIPHER_SERVER_PREFERENCE != 0:
1493 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1494 OP_CIPHER_SERVER_PREFERENCE)
1495
Christian Heimes4c05b472013-11-23 15:58:30 +01001496 def test_create_default_context(self):
1497 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001498
Christian Heimesa170fa12017-09-15 20:27:30 +02001499 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001500 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001501 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001502 self._assert_context_options(ctx)
1503
Christian Heimes4c05b472013-11-23 15:58:30 +01001504 with open(SIGNING_CA) as f:
1505 cadata = f.read()
1506 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1507 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001508 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001509 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001510 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001511
1512 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001513 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001514 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001515 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001516
Christian Heimes67986f92013-11-23 22:43:47 +01001517 def test__create_stdlib_context(self):
1518 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001519 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001520 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001521 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001522 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001523
1524 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1525 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1526 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001527 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001528
1529 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001530 cert_reqs=ssl.CERT_REQUIRED,
1531 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001532 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1533 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001534 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001535 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001536
1537 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001539 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001540 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001541
Christian Heimes1aa9a752013-12-02 02:41:19 +01001542 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001543 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001544 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001545 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001546
Christian Heimese82c0342017-09-15 20:29:57 +02001547 # Auto set CERT_REQUIRED
1548 ctx.check_hostname = True
1549 self.assertTrue(ctx.check_hostname)
1550 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1551 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001552 ctx.verify_mode = ssl.CERT_REQUIRED
1553 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001554 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001555
Christian Heimese82c0342017-09-15 20:29:57 +02001556 # Changing verify_mode does not affect check_hostname
1557 ctx.check_hostname = False
1558 ctx.verify_mode = ssl.CERT_NONE
1559 ctx.check_hostname = False
1560 self.assertFalse(ctx.check_hostname)
1561 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1562 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001563 ctx.check_hostname = True
1564 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001565 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1566
1567 ctx.check_hostname = False
1568 ctx.verify_mode = ssl.CERT_OPTIONAL
1569 ctx.check_hostname = False
1570 self.assertFalse(ctx.check_hostname)
1571 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1572 # keep CERT_OPTIONAL
1573 ctx.check_hostname = True
1574 self.assertTrue(ctx.check_hostname)
1575 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001576
1577 # Cannot set CERT_NONE with check_hostname enabled
1578 with self.assertRaises(ValueError):
1579 ctx.verify_mode = ssl.CERT_NONE
1580 ctx.check_hostname = False
1581 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001582 ctx.verify_mode = ssl.CERT_NONE
1583 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001584
Christian Heimes5fe668c2016-09-12 00:01:11 +02001585 def test_context_client_server(self):
1586 # PROTOCOL_TLS_CLIENT has sane defaults
1587 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1588 self.assertTrue(ctx.check_hostname)
1589 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1590
1591 # PROTOCOL_TLS_SERVER has different but also sane defaults
1592 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1593 self.assertFalse(ctx.check_hostname)
1594 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1595
Christian Heimes4df60f12017-09-15 20:26:05 +02001596 def test_context_custom_class(self):
1597 class MySSLSocket(ssl.SSLSocket):
1598 pass
1599
1600 class MySSLObject(ssl.SSLObject):
1601 pass
1602
1603 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1604 ctx.sslsocket_class = MySSLSocket
1605 ctx.sslobject_class = MySSLObject
1606
1607 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1608 self.assertIsInstance(sock, MySSLSocket)
1609 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1610 self.assertIsInstance(obj, MySSLObject)
1611
Antoine Pitrou152efa22010-05-16 18:19:27 +00001612
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001613class SSLErrorTests(unittest.TestCase):
1614
1615 def test_str(self):
1616 # The str() of a SSLError doesn't include the errno
1617 e = ssl.SSLError(1, "foo")
1618 self.assertEqual(str(e), "foo")
1619 self.assertEqual(e.errno, 1)
1620 # Same for a subclass
1621 e = ssl.SSLZeroReturnError(1, "foo")
1622 self.assertEqual(str(e), "foo")
1623 self.assertEqual(e.errno, 1)
1624
1625 def test_lib_reason(self):
1626 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001627 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001628 with self.assertRaises(ssl.SSLError) as cm:
1629 ctx.load_dh_params(CERTFILE)
1630 self.assertEqual(cm.exception.library, 'PEM')
1631 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1632 s = str(cm.exception)
1633 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1634
1635 def test_subclass(self):
1636 # Check that the appropriate SSLError subclass is raised
1637 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001638 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1639 ctx.check_hostname = False
1640 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001641 with socket.socket() as s:
1642 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001643 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001644 c = socket.socket()
1645 c.connect(s.getsockname())
1646 c.setblocking(False)
1647 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001648 with self.assertRaises(ssl.SSLWantReadError) as cm:
1649 c.do_handshake()
1650 s = str(cm.exception)
1651 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1652 # For compatibility
1653 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1654
1655
Christian Heimes61d478c2018-01-27 15:51:38 +01001656 def test_bad_server_hostname(self):
1657 ctx = ssl.create_default_context()
1658 with self.assertRaises(ValueError):
1659 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1660 server_hostname="")
1661 with self.assertRaises(ValueError):
1662 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1663 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001664 with self.assertRaises(TypeError):
1665 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1666 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001667
1668
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001669class MemoryBIOTests(unittest.TestCase):
1670
1671 def test_read_write(self):
1672 bio = ssl.MemoryBIO()
1673 bio.write(b'foo')
1674 self.assertEqual(bio.read(), b'foo')
1675 self.assertEqual(bio.read(), b'')
1676 bio.write(b'foo')
1677 bio.write(b'bar')
1678 self.assertEqual(bio.read(), b'foobar')
1679 self.assertEqual(bio.read(), b'')
1680 bio.write(b'baz')
1681 self.assertEqual(bio.read(2), b'ba')
1682 self.assertEqual(bio.read(1), b'z')
1683 self.assertEqual(bio.read(1), b'')
1684
1685 def test_eof(self):
1686 bio = ssl.MemoryBIO()
1687 self.assertFalse(bio.eof)
1688 self.assertEqual(bio.read(), b'')
1689 self.assertFalse(bio.eof)
1690 bio.write(b'foo')
1691 self.assertFalse(bio.eof)
1692 bio.write_eof()
1693 self.assertFalse(bio.eof)
1694 self.assertEqual(bio.read(2), b'fo')
1695 self.assertFalse(bio.eof)
1696 self.assertEqual(bio.read(1), b'o')
1697 self.assertTrue(bio.eof)
1698 self.assertEqual(bio.read(), b'')
1699 self.assertTrue(bio.eof)
1700
1701 def test_pending(self):
1702 bio = ssl.MemoryBIO()
1703 self.assertEqual(bio.pending, 0)
1704 bio.write(b'foo')
1705 self.assertEqual(bio.pending, 3)
1706 for i in range(3):
1707 bio.read(1)
1708 self.assertEqual(bio.pending, 3-i-1)
1709 for i in range(3):
1710 bio.write(b'x')
1711 self.assertEqual(bio.pending, i+1)
1712 bio.read()
1713 self.assertEqual(bio.pending, 0)
1714
1715 def test_buffer_types(self):
1716 bio = ssl.MemoryBIO()
1717 bio.write(b'foo')
1718 self.assertEqual(bio.read(), b'foo')
1719 bio.write(bytearray(b'bar'))
1720 self.assertEqual(bio.read(), b'bar')
1721 bio.write(memoryview(b'baz'))
1722 self.assertEqual(bio.read(), b'baz')
1723
1724 def test_error_types(self):
1725 bio = ssl.MemoryBIO()
1726 self.assertRaises(TypeError, bio.write, 'foo')
1727 self.assertRaises(TypeError, bio.write, None)
1728 self.assertRaises(TypeError, bio.write, True)
1729 self.assertRaises(TypeError, bio.write, 1)
1730
1731
Christian Heimes89c20512018-02-27 11:17:32 +01001732class SSLObjectTests(unittest.TestCase):
1733 def test_private_init(self):
1734 bio = ssl.MemoryBIO()
1735 with self.assertRaisesRegex(TypeError, "public constructor"):
1736 ssl.SSLObject(bio, bio)
1737
Miss Islington (bot)c00f7032018-09-21 22:00:42 -07001738 def test_unwrap(self):
1739 client_ctx, server_ctx, hostname = testing_context()
1740 c_in = ssl.MemoryBIO()
1741 c_out = ssl.MemoryBIO()
1742 s_in = ssl.MemoryBIO()
1743 s_out = ssl.MemoryBIO()
1744 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1745 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1746
1747 # Loop on the handshake for a bit to get it settled
1748 for _ in range(5):
1749 try:
1750 client.do_handshake()
1751 except ssl.SSLWantReadError:
1752 pass
1753 if c_out.pending:
1754 s_in.write(c_out.read())
1755 try:
1756 server.do_handshake()
1757 except ssl.SSLWantReadError:
1758 pass
1759 if s_out.pending:
1760 c_in.write(s_out.read())
1761 # Now the handshakes should be complete (don't raise WantReadError)
1762 client.do_handshake()
1763 server.do_handshake()
1764
1765 # Now if we unwrap one side unilaterally, it should send close-notify
1766 # and raise WantReadError:
1767 with self.assertRaises(ssl.SSLWantReadError):
1768 client.unwrap()
1769
1770 # But server.unwrap() does not raise, because it reads the client's
1771 # close-notify:
1772 s_in.write(c_out.read())
1773 server.unwrap()
1774
1775 # And now that the client gets the server's close-notify, it doesn't
1776 # raise either.
1777 c_in.write(s_out.read())
1778 client.unwrap()
Christian Heimes89c20512018-02-27 11:17:32 +01001779
Martin Panter3840b2a2016-03-27 01:53:46 +00001780class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001781 """Tests that connect to a simple server running in the background"""
1782
1783 def setUp(self):
1784 server = ThreadedEchoServer(SIGNED_CERTFILE)
1785 self.server_addr = (HOST, server.port)
1786 server.__enter__()
1787 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001788
Antoine Pitrou480a1242010-04-28 21:37:09 +00001789 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001790 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001791 cert_reqs=ssl.CERT_NONE) as s:
1792 s.connect(self.server_addr)
1793 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001794 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001795
Martin Panter3840b2a2016-03-27 01:53:46 +00001796 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001797 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001798 cert_reqs=ssl.CERT_REQUIRED,
1799 ca_certs=SIGNING_CA) as s:
1800 s.connect(self.server_addr)
1801 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001802 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001803
Martin Panter3840b2a2016-03-27 01:53:46 +00001804 def test_connect_fail(self):
1805 # This should fail because we have no verification certs. Connection
1806 # failure crashes ThreadedEchoServer, so run this in an independent
1807 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001808 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001809 cert_reqs=ssl.CERT_REQUIRED)
1810 self.addCleanup(s.close)
1811 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1812 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001813
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001814 def test_connect_ex(self):
1815 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001816 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001817 cert_reqs=ssl.CERT_REQUIRED,
1818 ca_certs=SIGNING_CA)
1819 self.addCleanup(s.close)
1820 self.assertEqual(0, s.connect_ex(self.server_addr))
1821 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001822
1823 def test_non_blocking_connect_ex(self):
1824 # Issue #11326: non-blocking connect_ex() should allow handshake
1825 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001826 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001827 cert_reqs=ssl.CERT_REQUIRED,
1828 ca_certs=SIGNING_CA,
1829 do_handshake_on_connect=False)
1830 self.addCleanup(s.close)
1831 s.setblocking(False)
1832 rc = s.connect_ex(self.server_addr)
1833 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1834 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1835 # Wait for connect to finish
1836 select.select([], [s], [], 5.0)
1837 # Non-blocking handshake
1838 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001839 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001840 s.do_handshake()
1841 break
1842 except ssl.SSLWantReadError:
1843 select.select([s], [], [], 5.0)
1844 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001845 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001846 # SSL established
1847 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001848
Antoine Pitrou152efa22010-05-16 18:19:27 +00001849 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001850 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001851 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001852 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1853 s.connect(self.server_addr)
1854 self.assertEqual({}, s.getpeercert())
1855 # Same with a server hostname
1856 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1857 server_hostname="dummy") as s:
1858 s.connect(self.server_addr)
1859 ctx.verify_mode = ssl.CERT_REQUIRED
1860 # This should succeed because we specify the root cert
1861 ctx.load_verify_locations(SIGNING_CA)
1862 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1863 s.connect(self.server_addr)
1864 cert = s.getpeercert()
1865 self.assertTrue(cert)
1866
1867 def test_connect_with_context_fail(self):
1868 # This should fail because we have no verification certs. Connection
1869 # failure crashes ThreadedEchoServer, so run this in an independent
1870 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001871 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 ctx.verify_mode = ssl.CERT_REQUIRED
1873 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1874 self.addCleanup(s.close)
1875 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1876 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001877
1878 def test_connect_capath(self):
1879 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001880 # NOTE: the subject hashing algorithm has been changed between
1881 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1882 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001883 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001884 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001885 ctx.verify_mode = ssl.CERT_REQUIRED
1886 ctx.load_verify_locations(capath=CAPATH)
1887 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1888 s.connect(self.server_addr)
1889 cert = s.getpeercert()
1890 self.assertTrue(cert)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07001891
Martin Panter3840b2a2016-03-27 01:53:46 +00001892 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001893 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001894 ctx.verify_mode = ssl.CERT_REQUIRED
1895 ctx.load_verify_locations(capath=BYTES_CAPATH)
1896 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1897 s.connect(self.server_addr)
1898 cert = s.getpeercert()
1899 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001900
Christian Heimesefff7062013-11-21 03:35:02 +01001901 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001902 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001903 pem = f.read()
1904 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001905 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001906 ctx.verify_mode = ssl.CERT_REQUIRED
1907 ctx.load_verify_locations(cadata=pem)
1908 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1909 s.connect(self.server_addr)
1910 cert = s.getpeercert()
1911 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001912
Martin Panter3840b2a2016-03-27 01:53:46 +00001913 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001914 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001915 ctx.verify_mode = ssl.CERT_REQUIRED
1916 ctx.load_verify_locations(cadata=der)
1917 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1918 s.connect(self.server_addr)
1919 cert = s.getpeercert()
1920 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001921
Antoine Pitroue3220242010-04-24 11:13:53 +00001922 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1923 def test_makefile_close(self):
1924 # Issue #5238: creating a file-like object with makefile() shouldn't
1925 # delay closing the underlying "real socket" (here tested with its
1926 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001927 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001928 ss.connect(self.server_addr)
1929 fd = ss.fileno()
1930 f = ss.makefile()
1931 f.close()
1932 # The fd is still open
1933 os.read(fd, 0)
1934 # Closing the SSL socket should close the fd too
1935 ss.close()
1936 gc.collect()
1937 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001938 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001940
Antoine Pitrou480a1242010-04-28 21:37:09 +00001941 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 s = socket.socket(socket.AF_INET)
1943 s.connect(self.server_addr)
1944 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001945 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 cert_reqs=ssl.CERT_NONE,
1947 do_handshake_on_connect=False)
1948 self.addCleanup(s.close)
1949 count = 0
1950 while True:
1951 try:
1952 count += 1
1953 s.do_handshake()
1954 break
1955 except ssl.SSLWantReadError:
1956 select.select([s], [], [])
1957 except ssl.SSLWantWriteError:
1958 select.select([], [s], [])
1959 if support.verbose:
1960 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001961
Antoine Pitrou480a1242010-04-28 21:37:09 +00001962 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001963 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001964
Martin Panter3840b2a2016-03-27 01:53:46 +00001965 def test_get_server_certificate_fail(self):
1966 # Connection failure crashes ThreadedEchoServer, so run this in an
1967 # independent test method
1968 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001969
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001970 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001971 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001972 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1973 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001974 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001975 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1976 s.connect(self.server_addr)
1977 # Error checking can happen at instantiation or when connecting
1978 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1979 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001980 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001981 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1982 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001983
Christian Heimes9a5395a2013-06-17 15:44:12 +02001984 def test_get_ca_certs_capath(self):
1985 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001986 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001987 ctx.load_verify_locations(capath=CAPATH)
1988 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001989 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1990 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 s.connect(self.server_addr)
1992 cert = s.getpeercert()
1993 self.assertTrue(cert)
1994 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001995
Christian Heimes575596e2013-12-15 21:49:17 +01001996 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001997 def test_context_setget(self):
1998 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001999 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2000 ctx1.load_verify_locations(capath=CAPATH)
2001 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2002 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002003 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002004 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002005 ss.connect(self.server_addr)
2006 self.assertIs(ss.context, ctx1)
2007 self.assertIs(ss._sslobj.context, ctx1)
2008 ss.context = ctx2
2009 self.assertIs(ss.context, ctx2)
2010 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002011
2012 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2013 # A simple IO loop. Call func(*args) depending on the error we get
2014 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2015 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002016 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002017 count = 0
2018 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002019 if time.monotonic() > deadline:
2020 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002021 errno = None
2022 count += 1
2023 try:
2024 ret = func(*args)
2025 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002026 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002027 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002028 raise
2029 errno = e.errno
2030 # Get any data from the outgoing BIO irrespective of any error, and
2031 # send it to the socket.
2032 buf = outgoing.read()
2033 sock.sendall(buf)
2034 # If there's no error, we're done. For WANT_READ, we need to get
2035 # data from the socket and put it in the incoming BIO.
2036 if errno is None:
2037 break
2038 elif errno == ssl.SSL_ERROR_WANT_READ:
2039 buf = sock.recv(32768)
2040 if buf:
2041 incoming.write(buf)
2042 else:
2043 incoming.write_eof()
2044 if support.verbose:
2045 sys.stdout.write("Needed %d calls to complete %s().\n"
2046 % (count, func.__name__))
2047 return ret
2048
Martin Panter3840b2a2016-03-27 01:53:46 +00002049 def test_bio_handshake(self):
2050 sock = socket.socket(socket.AF_INET)
2051 self.addCleanup(sock.close)
2052 sock.connect(self.server_addr)
2053 incoming = ssl.MemoryBIO()
2054 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002055 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2056 self.assertTrue(ctx.check_hostname)
2057 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002058 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002059 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2060 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002061 self.assertIs(sslobj._sslobj.owner, sslobj)
2062 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002063 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002064 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002065 self.assertRaises(ValueError, sslobj.getpeercert)
2066 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2067 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2068 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2069 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002070 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002071 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002072 self.assertTrue(sslobj.getpeercert())
2073 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2074 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2075 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002076 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002077 except ssl.SSLSyscallError:
2078 # If the server shuts down the TCP connection without sending a
2079 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2080 pass
2081 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2082
2083 def test_bio_read_write_data(self):
2084 sock = socket.socket(socket.AF_INET)
2085 self.addCleanup(sock.close)
2086 sock.connect(self.server_addr)
2087 incoming = ssl.MemoryBIO()
2088 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002089 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002090 ctx.verify_mode = ssl.CERT_NONE
2091 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2092 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2093 req = b'FOO\n'
2094 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2095 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2096 self.assertEqual(buf, b'foo\n')
2097 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002098
2099
Martin Panter3840b2a2016-03-27 01:53:46 +00002100class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002101
Martin Panter3840b2a2016-03-27 01:53:46 +00002102 def test_timeout_connect_ex(self):
2103 # Issue #12065: on a timeout, connect_ex() should return the original
2104 # errno (mimicking the behaviour of non-SSL sockets).
2105 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002106 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002107 cert_reqs=ssl.CERT_REQUIRED,
2108 do_handshake_on_connect=False)
2109 self.addCleanup(s.close)
2110 s.settimeout(0.0000001)
2111 rc = s.connect_ex((REMOTE_HOST, 443))
2112 if rc == 0:
2113 self.skipTest("REMOTE_HOST responded too quickly")
2114 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2115
2116 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2117 def test_get_server_certificate_ipv6(self):
2118 with support.transient_internet('ipv6.google.com'):
2119 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2120 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2121
Martin Panter3840b2a2016-03-27 01:53:46 +00002122
2123def _test_get_server_certificate(test, host, port, cert=None):
2124 pem = ssl.get_server_certificate((host, port))
2125 if not pem:
2126 test.fail("No server certificate on %s:%s!" % (host, port))
2127
2128 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2129 if not pem:
2130 test.fail("No server certificate on %s:%s!" % (host, port))
2131 if support.verbose:
2132 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2133
2134def _test_get_server_certificate_fail(test, host, port):
2135 try:
2136 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2137 except ssl.SSLError as x:
2138 #should fail
2139 if support.verbose:
2140 sys.stdout.write("%s\n" % x)
2141 else:
2142 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2143
2144
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002145from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002146
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002147class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002148
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002149 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002150
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002151 """A mildly complicated class, because we want it to work both
2152 with and without the SSL wrapper around the socket connection, so
2153 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002154
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002155 def __init__(self, server, connsock, addr):
2156 self.server = server
2157 self.running = False
2158 self.sock = connsock
2159 self.addr = addr
2160 self.sock.setblocking(1)
2161 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002162 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002163 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002164
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002165 def wrap_conn(self):
2166 try:
2167 self.sslconn = self.server.context.wrap_socket(
2168 self.sock, server_side=True)
2169 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2170 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002171 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002172 # We treat ConnectionResetError as though it were an
2173 # SSLError - OpenSSL on Ubuntu abruptly closes the
2174 # connection when asked to use an unsupported protocol.
2175 #
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002176 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2177 # tries to send session tickets after handshake.
2178 # https://github.com/openssl/openssl/issues/6342
2179 self.server.conn_errors.append(str(e))
2180 if self.server.chatty:
2181 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2182 self.running = False
2183 self.close()
2184 return False
2185 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002186 # OSError may occur with wrong protocols, e.g. both
2187 # sides use PROTOCOL_TLS_SERVER.
2188 #
2189 # XXX Various errors can have happened here, for example
2190 # a mismatching protocol version, an invalid certificate,
2191 # or a low-level bug. This should be made more discriminating.
2192 #
2193 # bpo-31323: Store the exception as string to prevent
2194 # a reference leak: server -> conn_errors -> exception
2195 # -> traceback -> self (ConnectionHandler) -> server
2196 self.server.conn_errors.append(str(e))
2197 if self.server.chatty:
2198 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2199 self.running = False
2200 self.server.stop()
2201 self.close()
2202 return False
2203 else:
2204 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2205 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2206 cert = self.sslconn.getpeercert()
2207 if support.verbose and self.server.chatty:
2208 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2209 cert_binary = self.sslconn.getpeercert(True)
2210 if support.verbose and self.server.chatty:
2211 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2212 cipher = self.sslconn.cipher()
2213 if support.verbose and self.server.chatty:
2214 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2215 sys.stdout.write(" server: selected protocol is now "
2216 + str(self.sslconn.selected_npn_protocol()) + "\n")
2217 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002218
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002219 def read(self):
2220 if self.sslconn:
2221 return self.sslconn.read()
2222 else:
2223 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002224
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002225 def write(self, bytes):
2226 if self.sslconn:
2227 return self.sslconn.write(bytes)
2228 else:
2229 return self.sock.send(bytes)
2230
2231 def close(self):
2232 if self.sslconn:
2233 self.sslconn.close()
2234 else:
2235 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002236
Antoine Pitrou480a1242010-04-28 21:37:09 +00002237 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002238 self.running = True
2239 if not self.server.starttls_server:
2240 if not self.wrap_conn():
2241 return
2242 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002243 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002244 msg = self.read()
2245 stripped = msg.strip()
2246 if not stripped:
2247 # eof, so quit this handler
2248 self.running = False
2249 try:
2250 self.sock = self.sslconn.unwrap()
2251 except OSError:
2252 # Many tests shut the TCP connection down
2253 # without an SSL shutdown. This causes
2254 # unwrap() to raise OSError with errno=0!
2255 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002256 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002257 self.sslconn = None
2258 self.close()
2259 elif stripped == b'over':
2260 if support.verbose and self.server.connectionchatty:
2261 sys.stdout.write(" server: client closed connection\n")
2262 self.close()
2263 return
2264 elif (self.server.starttls_server and
2265 stripped == b'STARTTLS'):
2266 if support.verbose and self.server.connectionchatty:
2267 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2268 self.write(b"OK\n")
2269 if not self.wrap_conn():
2270 return
2271 elif (self.server.starttls_server and self.sslconn
2272 and stripped == b'ENDTLS'):
2273 if support.verbose and self.server.connectionchatty:
2274 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2275 self.write(b"OK\n")
2276 self.sock = self.sslconn.unwrap()
2277 self.sslconn = None
2278 if support.verbose and self.server.connectionchatty:
2279 sys.stdout.write(" server: connection is now unencrypted...\n")
2280 elif stripped == b'CB tls-unique':
2281 if support.verbose and self.server.connectionchatty:
2282 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2283 data = self.sslconn.get_channel_binding("tls-unique")
2284 self.write(repr(data).encode("us-ascii") + b"\n")
2285 else:
2286 if (support.verbose and
2287 self.server.connectionchatty):
2288 ctype = (self.sslconn and "encrypted") or "unencrypted"
2289 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2290 % (msg, ctype, msg.lower(), ctype))
2291 self.write(msg.lower())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002292 except ConnectionResetError:
2293 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2294 # when connection is not shut down gracefully.
2295 if self.server.chatty and support.verbose:
2296 sys.stdout.write(
2297 " Connection reset by peer: {}\n".format(
2298 self.addr)
2299 )
2300 self.close()
2301 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002302 except OSError:
2303 if self.server.chatty:
2304 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002305 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002306 self.running = False
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002307
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002308 # normally, we'd just stop here, but for the test
2309 # harness, we want to stop the server
2310 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002311
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002312 def __init__(self, certificate=None, ssl_version=None,
2313 certreqs=None, cacerts=None,
2314 chatty=True, connectionchatty=False, starttls_server=False,
2315 npn_protocols=None, alpn_protocols=None,
2316 ciphers=None, context=None):
2317 if context:
2318 self.context = context
2319 else:
2320 self.context = ssl.SSLContext(ssl_version
2321 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002322 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002323 self.context.verify_mode = (certreqs if certreqs is not None
2324 else ssl.CERT_NONE)
2325 if cacerts:
2326 self.context.load_verify_locations(cacerts)
2327 if certificate:
2328 self.context.load_cert_chain(certificate)
2329 if npn_protocols:
2330 self.context.set_npn_protocols(npn_protocols)
2331 if alpn_protocols:
2332 self.context.set_alpn_protocols(alpn_protocols)
2333 if ciphers:
2334 self.context.set_ciphers(ciphers)
2335 self.chatty = chatty
2336 self.connectionchatty = connectionchatty
2337 self.starttls_server = starttls_server
2338 self.sock = socket.socket()
2339 self.port = support.bind_port(self.sock)
2340 self.flag = None
2341 self.active = False
2342 self.selected_npn_protocols = []
2343 self.selected_alpn_protocols = []
2344 self.shared_ciphers = []
2345 self.conn_errors = []
2346 threading.Thread.__init__(self)
2347 self.daemon = True
2348
2349 def __enter__(self):
2350 self.start(threading.Event())
2351 self.flag.wait()
2352 return self
2353
2354 def __exit__(self, *args):
2355 self.stop()
2356 self.join()
2357
2358 def start(self, flag=None):
2359 self.flag = flag
2360 threading.Thread.start(self)
2361
2362 def run(self):
2363 self.sock.settimeout(0.05)
2364 self.sock.listen()
2365 self.active = True
2366 if self.flag:
2367 # signal an event
2368 self.flag.set()
2369 while self.active:
2370 try:
2371 newconn, connaddr = self.sock.accept()
2372 if support.verbose and self.chatty:
2373 sys.stdout.write(' server: new connection from '
2374 + repr(connaddr) + '\n')
2375 handler = self.ConnectionHandler(self, newconn, connaddr)
2376 handler.start()
2377 handler.join()
2378 except socket.timeout:
2379 pass
2380 except KeyboardInterrupt:
2381 self.stop()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002382 except BaseException as e:
2383 if support.verbose and self.chatty:
2384 sys.stdout.write(
2385 ' connection handling failed: ' + repr(e) + '\n')
2386
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002387 self.sock.close()
2388
2389 def stop(self):
2390 self.active = False
2391
2392class AsyncoreEchoServer(threading.Thread):
2393
2394 # this one's based on asyncore.dispatcher
2395
2396 class EchoServer (asyncore.dispatcher):
2397
2398 class ConnectionHandler(asyncore.dispatcher_with_send):
2399
2400 def __init__(self, conn, certfile):
2401 self.socket = test_wrap_socket(conn, server_side=True,
2402 certfile=certfile,
2403 do_handshake_on_connect=False)
2404 asyncore.dispatcher_with_send.__init__(self, self.socket)
2405 self._ssl_accepting = True
2406 self._do_ssl_handshake()
2407
2408 def readable(self):
2409 if isinstance(self.socket, ssl.SSLSocket):
2410 while self.socket.pending() > 0:
2411 self.handle_read_event()
2412 return True
2413
2414 def _do_ssl_handshake(self):
2415 try:
2416 self.socket.do_handshake()
2417 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2418 return
2419 except ssl.SSLEOFError:
2420 return self.handle_close()
2421 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002422 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002423 except OSError as err:
2424 if err.args[0] == errno.ECONNABORTED:
2425 return self.handle_close()
2426 else:
2427 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002428
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002429 def handle_read(self):
2430 if self._ssl_accepting:
2431 self._do_ssl_handshake()
2432 else:
2433 data = self.recv(1024)
2434 if support.verbose:
2435 sys.stdout.write(" server: read %s from client\n" % repr(data))
2436 if not data:
2437 self.close()
2438 else:
2439 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002440
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 def handle_close(self):
2442 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002443 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002444 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002445
2446 def handle_error(self):
2447 raise
2448
Trent Nelson78520002008-04-10 20:54:35 +00002449 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002450 self.certfile = certfile
2451 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2452 self.port = support.bind_port(sock, '')
2453 asyncore.dispatcher.__init__(self, sock)
2454 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002455
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002456 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002457 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002458 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2459 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002460
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002461 def handle_error(self):
2462 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002463
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002464 def __init__(self, certfile):
2465 self.flag = None
2466 self.active = False
2467 self.server = self.EchoServer(certfile)
2468 self.port = self.server.port
2469 threading.Thread.__init__(self)
2470 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002471
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002472 def __str__(self):
2473 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002474
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 def __enter__(self):
2476 self.start(threading.Event())
2477 self.flag.wait()
2478 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002479
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002480 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002481 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002482 sys.stdout.write(" cleanup: stopping server.\n")
2483 self.stop()
2484 if support.verbose:
2485 sys.stdout.write(" cleanup: joining server thread.\n")
2486 self.join()
2487 if support.verbose:
2488 sys.stdout.write(" cleanup: successfully joined.\n")
2489 # make sure that ConnectionHandler is removed from socket_map
2490 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002491
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002492 def start (self, flag=None):
2493 self.flag = flag
2494 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002495
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002496 def run(self):
2497 self.active = True
2498 if self.flag:
2499 self.flag.set()
2500 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002501 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 asyncore.loop(1)
2503 except:
2504 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002505
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002506 def stop(self):
2507 self.active = False
2508 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002509
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002510def server_params_test(client_context, server_context, indata=b"FOO\n",
2511 chatty=True, connectionchatty=False, sni_name=None,
2512 session=None):
2513 """
2514 Launch a server, connect a client to it and try various reads
2515 and writes.
2516 """
2517 stats = {}
2518 server = ThreadedEchoServer(context=server_context,
2519 chatty=chatty,
2520 connectionchatty=False)
2521 with server:
2522 with client_context.wrap_socket(socket.socket(),
2523 server_hostname=sni_name, session=session) as s:
2524 s.connect((HOST, server.port))
2525 for arg in [indata, bytearray(indata), memoryview(indata)]:
2526 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002527 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002528 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002529 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002530 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002531 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002532 if connectionchatty:
2533 if support.verbose:
2534 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002535 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002537 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2538 % (outdata[:20], len(outdata),
2539 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002540 s.write(b"over\n")
2541 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002542 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002543 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002544 stats.update({
2545 'compression': s.compression(),
2546 'cipher': s.cipher(),
2547 'peercert': s.getpeercert(),
2548 'client_alpn_protocol': s.selected_alpn_protocol(),
2549 'client_npn_protocol': s.selected_npn_protocol(),
2550 'version': s.version(),
2551 'session_reused': s.session_reused,
2552 'session': s.session,
2553 })
2554 s.close()
2555 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2556 stats['server_npn_protocols'] = server.selected_npn_protocols
2557 stats['server_shared_ciphers'] = server.shared_ciphers
2558 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002559
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002560def try_protocol_combo(server_protocol, client_protocol, expect_success,
2561 certsreqs=None, server_options=0, client_options=0):
2562 """
2563 Try to SSL-connect using *client_protocol* to *server_protocol*.
2564 If *expect_success* is true, assert that the connection succeeds,
2565 if it's false, assert that the connection fails.
2566 Also, if *expect_success* is a string, assert that it is the protocol
2567 version actually used by the connection.
2568 """
2569 if certsreqs is None:
2570 certsreqs = ssl.CERT_NONE
2571 certtype = {
2572 ssl.CERT_NONE: "CERT_NONE",
2573 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2574 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2575 }[certsreqs]
2576 if support.verbose:
2577 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2578 sys.stdout.write(formatstr %
2579 (ssl.get_protocol_name(client_protocol),
2580 ssl.get_protocol_name(server_protocol),
2581 certtype))
2582 client_context = ssl.SSLContext(client_protocol)
2583 client_context.options |= client_options
2584 server_context = ssl.SSLContext(server_protocol)
2585 server_context.options |= server_options
2586
2587 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2588 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2589 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002590 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002591 client_context.set_ciphers("ALL")
2592
2593 for ctx in (client_context, server_context):
2594 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002595 ctx.load_cert_chain(SIGNED_CERTFILE)
2596 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002597 try:
2598 stats = server_params_test(client_context, server_context,
2599 chatty=False, connectionchatty=False)
2600 # Protocol mismatch can result in either an SSLError, or a
2601 # "Connection reset by peer" error.
2602 except ssl.SSLError:
2603 if expect_success:
2604 raise
2605 except OSError as e:
2606 if expect_success or e.errno != errno.ECONNRESET:
2607 raise
2608 else:
2609 if not expect_success:
2610 raise AssertionError(
2611 "Client protocol %s succeeded with server protocol %s!"
2612 % (ssl.get_protocol_name(client_protocol),
2613 ssl.get_protocol_name(server_protocol)))
2614 elif (expect_success is not True
2615 and expect_success != stats['version']):
2616 raise AssertionError("version mismatch: expected %r, got %r"
2617 % (expect_success, stats['version']))
2618
2619
2620class ThreadedTests(unittest.TestCase):
2621
2622 @skip_if_broken_ubuntu_ssl
2623 def test_echo(self):
2624 """Basic test of an SSL client connecting to a server"""
2625 if support.verbose:
2626 sys.stdout.write("\n")
2627 for protocol in PROTOCOLS:
2628 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2629 continue
2630 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2631 context = ssl.SSLContext(protocol)
2632 context.load_cert_chain(CERTFILE)
2633 server_params_test(context, context,
2634 chatty=True, connectionchatty=True)
2635
Christian Heimesa170fa12017-09-15 20:27:30 +02002636 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002637
2638 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2639 server_params_test(client_context=client_context,
2640 server_context=server_context,
2641 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002642 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002643
2644 client_context.check_hostname = False
2645 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2646 with self.assertRaises(ssl.SSLError) as e:
2647 server_params_test(client_context=server_context,
2648 server_context=client_context,
2649 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002650 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002651 self.assertIn('called a function you should not call',
2652 str(e.exception))
2653
2654 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2655 with self.assertRaises(ssl.SSLError) as e:
2656 server_params_test(client_context=server_context,
2657 server_context=server_context,
2658 chatty=True, connectionchatty=True)
2659 self.assertIn('called a function you should not call',
2660 str(e.exception))
2661
2662 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2663 with self.assertRaises(ssl.SSLError) as e:
2664 server_params_test(client_context=server_context,
2665 server_context=client_context,
2666 chatty=True, connectionchatty=True)
2667 self.assertIn('called a function you should not call',
2668 str(e.exception))
2669
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002670 def test_getpeercert(self):
2671 if support.verbose:
2672 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002673
2674 client_context, server_context, hostname = testing_context()
2675 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002676 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002677 with client_context.wrap_socket(socket.socket(),
2678 do_handshake_on_connect=False,
2679 server_hostname=hostname) as s:
2680 s.connect((HOST, server.port))
2681 # getpeercert() raise ValueError while the handshake isn't
2682 # done.
2683 with self.assertRaises(ValueError):
2684 s.getpeercert()
2685 s.do_handshake()
2686 cert = s.getpeercert()
2687 self.assertTrue(cert, "Can't get peer certificate.")
2688 cipher = s.cipher()
2689 if support.verbose:
2690 sys.stdout.write(pprint.pformat(cert) + '\n')
2691 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2692 if 'subject' not in cert:
2693 self.fail("No subject field in certificate: %s." %
2694 pprint.pformat(cert))
2695 if ((('organizationName', 'Python Software Foundation'),)
2696 not in cert['subject']):
2697 self.fail(
2698 "Missing or invalid 'organizationName' field in certificate subject; "
2699 "should be 'Python Software Foundation'.")
2700 self.assertIn('notBefore', cert)
2701 self.assertIn('notAfter', cert)
2702 before = ssl.cert_time_to_seconds(cert['notBefore'])
2703 after = ssl.cert_time_to_seconds(cert['notAfter'])
2704 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002705
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002706 @unittest.skipUnless(have_verify_flags(),
2707 "verify_flags need OpenSSL > 0.9.8")
2708 def test_crl_check(self):
2709 if support.verbose:
2710 sys.stdout.write("\n")
2711
Christian Heimesa170fa12017-09-15 20:27:30 +02002712 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002714 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002715 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002716
2717 # VERIFY_DEFAULT should pass
2718 server = ThreadedEchoServer(context=server_context, chatty=True)
2719 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002720 with client_context.wrap_socket(socket.socket(),
2721 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002722 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002723 cert = s.getpeercert()
2724 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002725
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002726 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002727 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002728
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002729 server = ThreadedEchoServer(context=server_context, chatty=True)
2730 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002731 with client_context.wrap_socket(socket.socket(),
2732 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002733 with self.assertRaisesRegex(ssl.SSLError,
2734 "certificate verify failed"):
2735 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002736
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002737 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002738 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002739
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002740 server = ThreadedEchoServer(context=server_context, chatty=True)
2741 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002742 with client_context.wrap_socket(socket.socket(),
2743 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002744 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002745 cert = s.getpeercert()
2746 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002747
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002748 def test_check_hostname(self):
2749 if support.verbose:
2750 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002751
Christian Heimesa170fa12017-09-15 20:27:30 +02002752 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002753
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002754 # correct hostname should verify
2755 server = ThreadedEchoServer(context=server_context, chatty=True)
2756 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002757 with client_context.wrap_socket(socket.socket(),
2758 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002759 s.connect((HOST, server.port))
2760 cert = s.getpeercert()
2761 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002762
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002763 # incorrect hostname should raise an exception
2764 server = ThreadedEchoServer(context=server_context, chatty=True)
2765 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002766 with client_context.wrap_socket(socket.socket(),
2767 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002768 with self.assertRaisesRegex(
2769 ssl.CertificateError,
2770 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002771 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002772
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002773 # missing server_hostname arg should cause an exception, too
2774 server = ThreadedEchoServer(context=server_context, chatty=True)
2775 with server:
2776 with socket.socket() as s:
2777 with self.assertRaisesRegex(ValueError,
2778 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002779 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002781 def test_ecc_cert(self):
2782 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2783 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002784 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002785 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2786
2787 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2788 # load ECC cert
2789 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2790
2791 # correct hostname should verify
2792 server = ThreadedEchoServer(context=server_context, chatty=True)
2793 with server:
2794 with client_context.wrap_socket(socket.socket(),
2795 server_hostname=hostname) as s:
2796 s.connect((HOST, server.port))
2797 cert = s.getpeercert()
2798 self.assertTrue(cert, "Can't get peer certificate.")
2799 cipher = s.cipher()[0].split('-')
2800 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2801
2802 def test_dual_rsa_ecc(self):
2803 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2804 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002805 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2806 # algorithms.
2807 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002808 # only ECDSA certs
2809 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2810 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2811
2812 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2813 # load ECC and RSA key/cert pairs
2814 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2815 server_context.load_cert_chain(SIGNED_CERTFILE)
2816
2817 # correct hostname should verify
2818 server = ThreadedEchoServer(context=server_context, chatty=True)
2819 with server:
2820 with client_context.wrap_socket(socket.socket(),
2821 server_hostname=hostname) as s:
2822 s.connect((HOST, server.port))
2823 cert = s.getpeercert()
2824 self.assertTrue(cert, "Can't get peer certificate.")
2825 cipher = s.cipher()[0].split('-')
2826 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2827
Christian Heimes66e57422018-01-29 14:25:13 +01002828 def test_check_hostname_idn(self):
2829 if support.verbose:
2830 sys.stdout.write("\n")
2831
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002832 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002833 server_context.load_cert_chain(IDNSANSFILE)
2834
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002835 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002836 context.verify_mode = ssl.CERT_REQUIRED
2837 context.check_hostname = True
2838 context.load_verify_locations(SIGNING_CA)
2839
2840 # correct hostname should verify, when specified in several
2841 # different ways
2842 idn_hostnames = [
2843 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002844 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002845 ('xn--knig-5qa.idn.pythontest.net',
2846 'xn--knig-5qa.idn.pythontest.net'),
2847 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002848 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002849
2850 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002851 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002852 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2853 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2854 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002855 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2856
2857 # ('königsgäßchen.idna2008.pythontest.net',
2858 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2859 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2860 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2861 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2862 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2863
Christian Heimes66e57422018-01-29 14:25:13 +01002864 ]
2865 for server_hostname, expected_hostname in idn_hostnames:
2866 server = ThreadedEchoServer(context=server_context, chatty=True)
2867 with server:
2868 with context.wrap_socket(socket.socket(),
2869 server_hostname=server_hostname) as s:
2870 self.assertEqual(s.server_hostname, expected_hostname)
2871 s.connect((HOST, server.port))
2872 cert = s.getpeercert()
2873 self.assertEqual(s.server_hostname, expected_hostname)
2874 self.assertTrue(cert, "Can't get peer certificate.")
2875
Christian Heimes66e57422018-01-29 14:25:13 +01002876 # incorrect hostname should raise an exception
2877 server = ThreadedEchoServer(context=server_context, chatty=True)
2878 with server:
2879 with context.wrap_socket(socket.socket(),
2880 server_hostname="python.example.org") as s:
2881 with self.assertRaises(ssl.CertificateError):
2882 s.connect((HOST, server.port))
2883
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002884 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002885 """Connecting when the server rejects the client's certificate
2886
2887 Launch a server with CERT_REQUIRED, and check that trying to
2888 connect to it with a wrong client certificate fails.
2889 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002890 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002891 # load client cert that is not signed by trusted CA
2892 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002893 # require TLS client authentication
2894 server_context.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002895 # TLS 1.3 has different handshake
2896 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002897
2898 server = ThreadedEchoServer(
2899 context=server_context, chatty=True, connectionchatty=True,
2900 )
2901
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002902 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002903 client_context.wrap_socket(socket.socket(),
2904 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002905 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002906 # Expect either an SSL error about the server rejecting
2907 # the connection, or a low-level connection reset (which
2908 # sometimes happens on Windows)
2909 s.connect((HOST, server.port))
2910 except ssl.SSLError as e:
2911 if support.verbose:
2912 sys.stdout.write("\nSSLError is %r\n" % e)
2913 except OSError as e:
2914 if e.errno != errno.ECONNRESET:
2915 raise
2916 if support.verbose:
2917 sys.stdout.write("\nsocket.error is %r\n" % e)
2918 else:
2919 self.fail("Use of invalid cert should have failed!")
2920
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002921 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2922 def test_wrong_cert_tls13(self):
2923 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002924 # load client cert that is not signed by trusted CA
2925 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002926 server_context.verify_mode = ssl.CERT_REQUIRED
2927 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2928 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2929
2930 server = ThreadedEchoServer(
2931 context=server_context, chatty=True, connectionchatty=True,
2932 )
2933 with server, \
2934 client_context.wrap_socket(socket.socket(),
2935 server_hostname=hostname) as s:
2936 # TLS 1.3 perform client cert exchange after handshake
2937 s.connect((HOST, server.port))
2938 try:
2939 s.write(b'data')
2940 s.read(4)
2941 except ssl.SSLError as e:
2942 if support.verbose:
2943 sys.stdout.write("\nSSLError is %r\n" % e)
2944 except OSError as e:
2945 if e.errno != errno.ECONNRESET:
2946 raise
2947 if support.verbose:
2948 sys.stdout.write("\nsocket.error is %r\n" % e)
2949 else:
2950 self.fail("Use of invalid cert should have failed!")
2951
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002952 def test_rude_shutdown(self):
2953 """A brutal shutdown of an SSL server should raise an OSError
2954 in the client when attempting handshake.
2955 """
2956 listener_ready = threading.Event()
2957 listener_gone = threading.Event()
2958
2959 s = socket.socket()
2960 port = support.bind_port(s, HOST)
2961
2962 # `listener` runs in a thread. It sits in an accept() until
2963 # the main thread connects. Then it rudely closes the socket,
2964 # and sets Event `listener_gone` to let the main thread know
2965 # the socket is gone.
2966 def listener():
2967 s.listen()
2968 listener_ready.set()
2969 newsock, addr = s.accept()
2970 newsock.close()
2971 s.close()
2972 listener_gone.set()
2973
2974 def connector():
2975 listener_ready.wait()
2976 with socket.socket() as c:
2977 c.connect((HOST, port))
2978 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002979 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002980 ssl_sock = test_wrap_socket(c)
2981 except OSError:
2982 pass
2983 else:
2984 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002985
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002986 t = threading.Thread(target=listener)
2987 t.start()
2988 try:
2989 connector()
2990 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002991 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002992
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002993 def test_ssl_cert_verify_error(self):
2994 if support.verbose:
2995 sys.stdout.write("\n")
2996
2997 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2998 server_context.load_cert_chain(SIGNED_CERTFILE)
2999
3000 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3001
3002 server = ThreadedEchoServer(context=server_context, chatty=True)
3003 with server:
3004 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003005 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003006 try:
3007 s.connect((HOST, server.port))
3008 except ssl.SSLError as e:
3009 msg = 'unable to get local issuer certificate'
3010 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3011 self.assertEqual(e.verify_code, 20)
3012 self.assertEqual(e.verify_message, msg)
3013 self.assertIn(msg, repr(e))
3014 self.assertIn('certificate verify failed', repr(e))
3015
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003016 @skip_if_broken_ubuntu_ssl
3017 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3018 "OpenSSL is compiled without SSLv2 support")
3019 def test_protocol_sslv2(self):
3020 """Connecting to an SSLv2 server with various client options"""
3021 if support.verbose:
3022 sys.stdout.write("\n")
3023 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3024 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3025 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003026 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003027 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3028 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3029 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3030 # SSLv23 client with specific SSL options
3031 if no_sslv2_implies_sslv3_hello():
3032 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003033 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003034 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003035 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003036 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003037 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003038 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003039
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003040 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02003041 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003042 """Connecting to an SSLv23 server with various client options"""
3043 if support.verbose:
3044 sys.stdout.write("\n")
3045 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003046 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003047 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003048 except OSError as x:
3049 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3050 if support.verbose:
3051 sys.stdout.write(
3052 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3053 % str(x))
3054 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003055 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3056 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3057 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003058
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003059 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003060 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3061 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3062 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003063
3064 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003065 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3066 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3067 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003068
3069 # Server with specific SSL options
3070 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003071 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003072 server_options=ssl.OP_NO_SSLv3)
3073 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003074 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003075 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003076 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003077 server_options=ssl.OP_NO_TLSv1)
3078
3079
3080 @skip_if_broken_ubuntu_ssl
3081 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3082 "OpenSSL is compiled without SSLv3 support")
3083 def test_protocol_sslv3(self):
3084 """Connecting to an SSLv3 server with various client options"""
3085 if support.verbose:
3086 sys.stdout.write("\n")
3087 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3088 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3089 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3090 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3091 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003092 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 client_options=ssl.OP_NO_SSLv3)
3094 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3095 if no_sslv2_implies_sslv3_hello():
3096 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003097 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003098 False, client_options=ssl.OP_NO_SSLv2)
3099
3100 @skip_if_broken_ubuntu_ssl
3101 def test_protocol_tlsv1(self):
3102 """Connecting to a TLSv1 server with various client options"""
3103 if support.verbose:
3104 sys.stdout.write("\n")
3105 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3106 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3107 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3108 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3109 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3110 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3111 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003112 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003113 client_options=ssl.OP_NO_TLSv1)
3114
3115 @skip_if_broken_ubuntu_ssl
3116 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3117 "TLS version 1.1 not supported.")
3118 def test_protocol_tlsv1_1(self):
3119 """Connecting to a TLSv1.1 server with various client options.
3120 Testing against older TLS versions."""
3121 if support.verbose:
3122 sys.stdout.write("\n")
3123 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3124 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3125 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3126 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3127 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003128 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003129 client_options=ssl.OP_NO_TLSv1_1)
3130
Christian Heimesa170fa12017-09-15 20:27:30 +02003131 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003132 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3133 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3134
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003135 @skip_if_broken_ubuntu_ssl
3136 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3137 "TLS version 1.2 not supported.")
3138 def test_protocol_tlsv1_2(self):
3139 """Connecting to a TLSv1.2 server with various client options.
3140 Testing against older TLS versions."""
3141 if support.verbose:
3142 sys.stdout.write("\n")
3143 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3144 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3145 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3146 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3147 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3148 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3149 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003150 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003151 client_options=ssl.OP_NO_TLSv1_2)
3152
Christian Heimesa170fa12017-09-15 20:27:30 +02003153 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003154 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3155 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3156 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3157 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3158
3159 def test_starttls(self):
3160 """Switching from clear text to encrypted and back again."""
3161 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3162
3163 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003164 starttls_server=True,
3165 chatty=True,
3166 connectionchatty=True)
3167 wrapped = False
3168 with server:
3169 s = socket.socket()
3170 s.setblocking(1)
3171 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003172 if support.verbose:
3173 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003174 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003175 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003176 sys.stdout.write(
3177 " client: sending %r...\n" % indata)
3178 if wrapped:
3179 conn.write(indata)
3180 outdata = conn.read()
3181 else:
3182 s.send(indata)
3183 outdata = s.recv(1024)
3184 msg = outdata.strip().lower()
3185 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3186 # STARTTLS ok, switch to secure mode
3187 if support.verbose:
3188 sys.stdout.write(
3189 " client: read %r from server, starting TLS...\n"
3190 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003191 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003192 wrapped = True
3193 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3194 # ENDTLS ok, switch back to clear text
3195 if support.verbose:
3196 sys.stdout.write(
3197 " client: read %r from server, ending TLS...\n"
3198 % msg)
3199 s = conn.unwrap()
3200 wrapped = False
3201 else:
3202 if support.verbose:
3203 sys.stdout.write(
3204 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003205 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003206 sys.stdout.write(" client: closing connection.\n")
3207 if wrapped:
3208 conn.write(b"over\n")
3209 else:
3210 s.send(b"over\n")
3211 if wrapped:
3212 conn.close()
3213 else:
3214 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003215
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003216 def test_socketserver(self):
3217 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003218 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 # try to connect
3220 if support.verbose:
3221 sys.stdout.write('\n')
3222 with open(CERTFILE, 'rb') as f:
3223 d1 = f.read()
3224 d2 = ''
3225 # now fetch the same data from the HTTPS server
3226 url = 'https://localhost:%d/%s' % (
3227 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003228 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003229 f = urllib.request.urlopen(url, context=context)
3230 try:
3231 dlen = f.info().get("content-length")
3232 if dlen and (int(dlen) > 0):
3233 d2 = f.read(int(dlen))
3234 if support.verbose:
3235 sys.stdout.write(
3236 " client: read %d bytes from remote server '%s'\n"
3237 % (len(d2), server))
3238 finally:
3239 f.close()
3240 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003241
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003242 def test_asyncore_server(self):
3243 """Check the example asyncore integration."""
3244 if support.verbose:
3245 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003246
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003247 indata = b"FOO\n"
3248 server = AsyncoreEchoServer(CERTFILE)
3249 with server:
3250 s = test_wrap_socket(socket.socket())
3251 s.connect(('127.0.0.1', server.port))
3252 if support.verbose:
3253 sys.stdout.write(
3254 " client: sending %r...\n" % indata)
3255 s.write(indata)
3256 outdata = s.read()
3257 if support.verbose:
3258 sys.stdout.write(" client: read %r\n" % outdata)
3259 if outdata != indata.lower():
3260 self.fail(
3261 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3262 % (outdata[:20], len(outdata),
3263 indata[:20].lower(), len(indata)))
3264 s.write(b"over\n")
3265 if support.verbose:
3266 sys.stdout.write(" client: closing connection.\n")
3267 s.close()
3268 if support.verbose:
3269 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003270
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003271 def test_recv_send(self):
3272 """Test recv(), send() and friends."""
3273 if support.verbose:
3274 sys.stdout.write("\n")
3275
3276 server = ThreadedEchoServer(CERTFILE,
3277 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003278 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003279 cacerts=CERTFILE,
3280 chatty=True,
3281 connectionchatty=False)
3282 with server:
3283 s = test_wrap_socket(socket.socket(),
3284 server_side=False,
3285 certfile=CERTFILE,
3286 ca_certs=CERTFILE,
3287 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003288 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003289 s.connect((HOST, server.port))
3290 # helper methods for standardising recv* method signatures
3291 def _recv_into():
3292 b = bytearray(b"\0"*100)
3293 count = s.recv_into(b)
3294 return b[:count]
3295
3296 def _recvfrom_into():
3297 b = bytearray(b"\0"*100)
3298 count, addr = s.recvfrom_into(b)
3299 return b[:count]
3300
3301 # (name, method, expect success?, *args, return value func)
3302 send_methods = [
3303 ('send', s.send, True, [], len),
3304 ('sendto', s.sendto, False, ["some.address"], len),
3305 ('sendall', s.sendall, True, [], lambda x: None),
3306 ]
3307 # (name, method, whether to expect success, *args)
3308 recv_methods = [
3309 ('recv', s.recv, True, []),
3310 ('recvfrom', s.recvfrom, False, ["some.address"]),
3311 ('recv_into', _recv_into, True, []),
3312 ('recvfrom_into', _recvfrom_into, False, []),
3313 ]
3314 data_prefix = "PREFIX_"
3315
3316 for (meth_name, send_meth, expect_success, args,
3317 ret_val_meth) in send_methods:
3318 indata = (data_prefix + meth_name).encode('ascii')
3319 try:
3320 ret = send_meth(indata, *args)
3321 msg = "sending with {}".format(meth_name)
3322 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3323 outdata = s.read()
3324 if outdata != indata.lower():
3325 self.fail(
3326 "While sending with <<{name:s}>> bad data "
3327 "<<{outdata:r}>> ({nout:d}) received; "
3328 "expected <<{indata:r}>> ({nin:d})\n".format(
3329 name=meth_name, outdata=outdata[:20],
3330 nout=len(outdata),
3331 indata=indata[:20], nin=len(indata)
3332 )
3333 )
3334 except ValueError as e:
3335 if expect_success:
3336 self.fail(
3337 "Failed to send with method <<{name:s}>>; "
3338 "expected to succeed.\n".format(name=meth_name)
3339 )
3340 if not str(e).startswith(meth_name):
3341 self.fail(
3342 "Method <<{name:s}>> failed with unexpected "
3343 "exception message: {exp:s}\n".format(
3344 name=meth_name, exp=e
3345 )
3346 )
3347
3348 for meth_name, recv_meth, expect_success, args in recv_methods:
3349 indata = (data_prefix + meth_name).encode('ascii')
3350 try:
3351 s.send(indata)
3352 outdata = recv_meth(*args)
3353 if outdata != indata.lower():
3354 self.fail(
3355 "While receiving with <<{name:s}>> bad data "
3356 "<<{outdata:r}>> ({nout:d}) received; "
3357 "expected <<{indata:r}>> ({nin:d})\n".format(
3358 name=meth_name, outdata=outdata[:20],
3359 nout=len(outdata),
3360 indata=indata[:20], nin=len(indata)
3361 )
3362 )
3363 except ValueError as e:
3364 if expect_success:
3365 self.fail(
3366 "Failed to receive with method <<{name:s}>>; "
3367 "expected to succeed.\n".format(name=meth_name)
3368 )
3369 if not str(e).startswith(meth_name):
3370 self.fail(
3371 "Method <<{name:s}>> failed with unexpected "
3372 "exception message: {exp:s}\n".format(
3373 name=meth_name, exp=e
3374 )
3375 )
3376 # consume data
3377 s.read()
3378
3379 # read(-1, buffer) is supported, even though read(-1) is not
3380 data = b"data"
3381 s.send(data)
3382 buffer = bytearray(len(data))
3383 self.assertEqual(s.read(-1, buffer), len(data))
3384 self.assertEqual(buffer, data)
3385
Christian Heimes888bbdc2017-09-07 14:18:21 -07003386 # sendall accepts bytes-like objects
3387 if ctypes is not None:
3388 ubyte = ctypes.c_ubyte * len(data)
3389 byteslike = ubyte.from_buffer_copy(data)
3390 s.sendall(byteslike)
3391 self.assertEqual(s.read(), data)
3392
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003393 # Make sure sendmsg et al are disallowed to avoid
3394 # inadvertent disclosure of data and/or corruption
3395 # of the encrypted data stream
3396 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3397 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3398 self.assertRaises(NotImplementedError,
3399 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003400 s.write(b"over\n")
3401
3402 self.assertRaises(ValueError, s.recv, -1)
3403 self.assertRaises(ValueError, s.read, -1)
3404
3405 s.close()
3406
3407 def test_recv_zero(self):
3408 server = ThreadedEchoServer(CERTFILE)
3409 server.__enter__()
3410 self.addCleanup(server.__exit__, None, None)
3411 s = socket.create_connection((HOST, server.port))
3412 self.addCleanup(s.close)
3413 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3414 self.addCleanup(s.close)
3415
3416 # recv/read(0) should return no data
3417 s.send(b"data")
3418 self.assertEqual(s.recv(0), b"")
3419 self.assertEqual(s.read(0), b"")
3420 self.assertEqual(s.read(), b"data")
3421
3422 # Should not block if the other end sends no data
3423 s.setblocking(False)
3424 self.assertEqual(s.recv(0), b"")
3425 self.assertEqual(s.recv_into(bytearray()), 0)
3426
3427 def test_nonblocking_send(self):
3428 server = ThreadedEchoServer(CERTFILE,
3429 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003430 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003431 cacerts=CERTFILE,
3432 chatty=True,
3433 connectionchatty=False)
3434 with server:
3435 s = test_wrap_socket(socket.socket(),
3436 server_side=False,
3437 certfile=CERTFILE,
3438 ca_certs=CERTFILE,
3439 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003440 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003441 s.connect((HOST, server.port))
3442 s.setblocking(False)
3443
3444 # If we keep sending data, at some point the buffers
3445 # will be full and the call will block
3446 buf = bytearray(8192)
3447 def fill_buffer():
3448 while True:
3449 s.send(buf)
3450 self.assertRaises((ssl.SSLWantWriteError,
3451 ssl.SSLWantReadError), fill_buffer)
3452
3453 # Now read all the output and discard it
3454 s.setblocking(True)
3455 s.close()
3456
3457 def test_handshake_timeout(self):
3458 # Issue #5103: SSL handshake must respect the socket timeout
3459 server = socket.socket(socket.AF_INET)
3460 host = "127.0.0.1"
3461 port = support.bind_port(server)
3462 started = threading.Event()
3463 finish = False
3464
3465 def serve():
3466 server.listen()
3467 started.set()
3468 conns = []
3469 while not finish:
3470 r, w, e = select.select([server], [], [], 0.1)
3471 if server in r:
3472 # Let the socket hang around rather than having
3473 # it closed by garbage collection.
3474 conns.append(server.accept()[0])
3475 for sock in conns:
3476 sock.close()
3477
3478 t = threading.Thread(target=serve)
3479 t.start()
3480 started.wait()
3481
3482 try:
3483 try:
3484 c = socket.socket(socket.AF_INET)
3485 c.settimeout(0.2)
3486 c.connect((host, port))
3487 # Will attempt handshake and time out
3488 self.assertRaisesRegex(socket.timeout, "timed out",
3489 test_wrap_socket, c)
3490 finally:
3491 c.close()
3492 try:
3493 c = socket.socket(socket.AF_INET)
3494 c = test_wrap_socket(c)
3495 c.settimeout(0.2)
3496 # Will attempt handshake and time out
3497 self.assertRaisesRegex(socket.timeout, "timed out",
3498 c.connect, (host, port))
3499 finally:
3500 c.close()
3501 finally:
3502 finish = True
3503 t.join()
3504 server.close()
3505
3506 def test_server_accept(self):
3507 # Issue #16357: accept() on a SSLSocket created through
3508 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003509 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003510 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003511 context.load_verify_locations(SIGNING_CA)
3512 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003513 server = socket.socket(socket.AF_INET)
3514 host = "127.0.0.1"
3515 port = support.bind_port(server)
3516 server = context.wrap_socket(server, server_side=True)
3517 self.assertTrue(server.server_side)
3518
3519 evt = threading.Event()
3520 remote = None
3521 peer = None
3522 def serve():
3523 nonlocal remote, peer
3524 server.listen()
3525 # Block on the accept and wait on the connection to close.
3526 evt.set()
3527 remote, peer = server.accept()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003528 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003529
3530 t = threading.Thread(target=serve)
3531 t.start()
3532 # Client wait until server setup and perform a connect.
3533 evt.wait()
3534 client = context.wrap_socket(socket.socket())
3535 client.connect((host, port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003536 client.send(b'data')
3537 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003538 client_addr = client.getsockname()
3539 client.close()
3540 t.join()
3541 remote.close()
3542 server.close()
3543 # Sanity checks.
3544 self.assertIsInstance(remote, ssl.SSLSocket)
3545 self.assertEqual(peer, client_addr)
3546
3547 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003548 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003549 with context.wrap_socket(socket.socket()) as sock:
3550 with self.assertRaises(OSError) as cm:
3551 sock.getpeercert()
3552 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3553
3554 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003555 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003556 with context.wrap_socket(socket.socket()) as sock:
3557 with self.assertRaises(OSError) as cm:
3558 sock.do_handshake()
3559 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3560
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003561 def test_no_shared_ciphers(self):
3562 client_context, server_context, hostname = testing_context()
3563 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3564 client_context.options |= ssl.OP_NO_TLSv1_3
3565 # Force different suites on client and master
3566 client_context.set_ciphers("AES128")
3567 server_context.set_ciphers("AES256")
3568 with ThreadedEchoServer(context=server_context) as server:
3569 with client_context.wrap_socket(socket.socket(),
3570 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003571 with self.assertRaises(OSError):
3572 s.connect((HOST, server.port))
3573 self.assertIn("no shared cipher", server.conn_errors[0])
3574
3575 def test_version_basic(self):
3576 """
3577 Basic tests for SSLSocket.version().
3578 More tests are done in the test_protocol_*() methods.
3579 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003580 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3581 context.check_hostname = False
3582 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003583 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003584 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003585 chatty=False) as server:
3586 with context.wrap_socket(socket.socket()) as s:
3587 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003588 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003589 s.connect((HOST, server.port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003590 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003591 self.assertEqual(s.version(), 'TLSv1.3')
3592 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003593 self.assertEqual(s.version(), 'TLSv1.2')
3594 else: # 0.9.8 to 1.0.1
3595 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003596 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003597 self.assertIs(s.version(), None)
3598
Christian Heimescb5b68a2017-09-07 18:07:00 -07003599 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3600 "test requires TLSv1.3 enabled OpenSSL")
3601 def test_tls1_3(self):
3602 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3603 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003604 context.options |= (
3605 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3606 )
3607 with ThreadedEchoServer(context=context) as server:
3608 with context.wrap_socket(socket.socket()) as s:
3609 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003610 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003611 'TLS_AES_256_GCM_SHA384',
3612 'TLS_CHACHA20_POLY1305_SHA256',
3613 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003614 })
3615 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003616
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003617 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3618 "required OpenSSL 1.1.0g")
3619 def test_min_max_version(self):
3620 client_context, server_context, hostname = testing_context()
3621 # client TLSv1.0 to 1.2
3622 client_context.minimum_version = ssl.TLSVersion.TLSv1
3623 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3624 # server only TLSv1.2
3625 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3626 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3627
3628 with ThreadedEchoServer(context=server_context) as server:
3629 with client_context.wrap_socket(socket.socket(),
3630 server_hostname=hostname) as s:
3631 s.connect((HOST, server.port))
3632 self.assertEqual(s.version(), 'TLSv1.2')
3633
3634 # client 1.0 to 1.2, server 1.0 to 1.1
3635 server_context.minimum_version = ssl.TLSVersion.TLSv1
3636 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3637
3638 with ThreadedEchoServer(context=server_context) as server:
3639 with client_context.wrap_socket(socket.socket(),
3640 server_hostname=hostname) as s:
3641 s.connect((HOST, server.port))
3642 self.assertEqual(s.version(), 'TLSv1.1')
3643
3644 # client 1.0, server 1.2 (mismatch)
3645 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3646 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3647 client_context.minimum_version = ssl.TLSVersion.TLSv1
3648 client_context.maximum_version = ssl.TLSVersion.TLSv1
3649 with ThreadedEchoServer(context=server_context) as server:
3650 with client_context.wrap_socket(socket.socket(),
3651 server_hostname=hostname) as s:
3652 with self.assertRaises(ssl.SSLError) as e:
3653 s.connect((HOST, server.port))
3654 self.assertIn("alert", str(e.exception))
3655
3656
3657 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3658 "required OpenSSL 1.1.0g")
3659 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3660 def test_min_max_version_sslv3(self):
3661 client_context, server_context, hostname = testing_context()
3662 server_context.minimum_version = ssl.TLSVersion.SSLv3
3663 client_context.minimum_version = ssl.TLSVersion.SSLv3
3664 client_context.maximum_version = ssl.TLSVersion.SSLv3
3665 with ThreadedEchoServer(context=server_context) as server:
3666 with client_context.wrap_socket(socket.socket(),
3667 server_hostname=hostname) as s:
3668 s.connect((HOST, server.port))
3669 self.assertEqual(s.version(), 'SSLv3')
3670
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003671 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3672 def test_default_ecdh_curve(self):
3673 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3674 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003675 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003676 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003677 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3678 # cipher name.
3679 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003680 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3681 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3682 # our default cipher list should prefer ECDH-based ciphers
3683 # automatically.
3684 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3685 context.set_ciphers("ECCdraft:ECDH")
3686 with ThreadedEchoServer(context=context) as server:
3687 with context.wrap_socket(socket.socket()) as s:
3688 s.connect((HOST, server.port))
3689 self.assertIn("ECDH", s.cipher()[0])
3690
3691 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3692 "'tls-unique' channel binding not available")
3693 def test_tls_unique_channel_binding(self):
3694 """Test tls-unique channel binding."""
3695 if support.verbose:
3696 sys.stdout.write("\n")
3697
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003698 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003699
3700 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003701 chatty=True,
3702 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003703
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003704 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003705 with client_context.wrap_socket(
3706 socket.socket(),
3707 server_hostname=hostname) as s:
3708 s.connect((HOST, server.port))
3709 # get the data
3710 cb_data = s.get_channel_binding("tls-unique")
3711 if support.verbose:
3712 sys.stdout.write(
3713 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003714
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003715 # check if it is sane
3716 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003717 if s.version() == 'TLSv1.3':
3718 self.assertEqual(len(cb_data), 48)
3719 else:
3720 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003721
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003722 # and compare with the peers version
3723 s.write(b"CB tls-unique\n")
3724 peer_data_repr = s.read().strip()
3725 self.assertEqual(peer_data_repr,
3726 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003727
3728 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003729 with client_context.wrap_socket(
3730 socket.socket(),
3731 server_hostname=hostname) as s:
3732 s.connect((HOST, server.port))
3733 new_cb_data = s.get_channel_binding("tls-unique")
3734 if support.verbose:
3735 sys.stdout.write(
3736 "got another channel binding data: {0!r}\n".format(
3737 new_cb_data)
3738 )
3739 # is it really unique
3740 self.assertNotEqual(cb_data, new_cb_data)
3741 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003742 if s.version() == 'TLSv1.3':
3743 self.assertEqual(len(cb_data), 48)
3744 else:
3745 self.assertEqual(len(cb_data), 12) # True for TLSv1
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003746 s.write(b"CB tls-unique\n")
3747 peer_data_repr = s.read().strip()
3748 self.assertEqual(peer_data_repr,
3749 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003750
3751 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003752 client_context, server_context, hostname = testing_context()
3753 stats = server_params_test(client_context, server_context,
3754 chatty=True, connectionchatty=True,
3755 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003756 if support.verbose:
3757 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3758 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3759
3760 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3761 "ssl.OP_NO_COMPRESSION needed for this test")
3762 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003763 client_context, server_context, hostname = testing_context()
3764 client_context.options |= ssl.OP_NO_COMPRESSION
3765 server_context.options |= ssl.OP_NO_COMPRESSION
3766 stats = server_params_test(client_context, server_context,
3767 chatty=True, connectionchatty=True,
3768 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769 self.assertIs(stats['compression'], None)
3770
3771 def test_dh_params(self):
3772 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003773 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003774 # test scenario needs TLS <= 1.2
3775 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003776 server_context.load_dh_params(DHFILE)
3777 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003778 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003779 stats = server_params_test(client_context, server_context,
3780 chatty=True, connectionchatty=True,
3781 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003782 cipher = stats["cipher"][0]
3783 parts = cipher.split("-")
3784 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3785 self.fail("Non-DH cipher: " + cipher[0])
3786
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003787 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003788 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003789 def test_ecdh_curve(self):
3790 # server secp384r1, client auto
3791 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003792
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003793 server_context.set_ecdh_curve("secp384r1")
3794 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3795 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3796 stats = server_params_test(client_context, server_context,
3797 chatty=True, connectionchatty=True,
3798 sni_name=hostname)
3799
3800 # server auto, client secp384r1
3801 client_context, server_context, hostname = testing_context()
3802 client_context.set_ecdh_curve("secp384r1")
3803 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3804 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3805 stats = server_params_test(client_context, server_context,
3806 chatty=True, connectionchatty=True,
3807 sni_name=hostname)
3808
3809 # server / client curve mismatch
3810 client_context, server_context, hostname = testing_context()
3811 client_context.set_ecdh_curve("prime256v1")
3812 server_context.set_ecdh_curve("secp384r1")
3813 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3814 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3815 try:
3816 stats = server_params_test(client_context, server_context,
3817 chatty=True, connectionchatty=True,
3818 sni_name=hostname)
3819 except ssl.SSLError:
3820 pass
3821 else:
3822 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003823 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003824 self.fail("mismatch curve did not fail")
3825
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003826 def test_selected_alpn_protocol(self):
3827 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003828 client_context, server_context, hostname = testing_context()
3829 stats = server_params_test(client_context, server_context,
3830 chatty=True, connectionchatty=True,
3831 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003832 self.assertIs(stats['client_alpn_protocol'], None)
3833
3834 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3835 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3836 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003837 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003838 server_context.set_alpn_protocols(['foo', 'bar'])
3839 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003840 chatty=True, connectionchatty=True,
3841 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003842 self.assertIs(stats['client_alpn_protocol'], None)
3843
3844 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3845 def test_alpn_protocols(self):
3846 server_protocols = ['foo', 'bar', 'milkshake']
3847 protocol_tests = [
3848 (['foo', 'bar'], 'foo'),
3849 (['bar', 'foo'], 'foo'),
3850 (['milkshake'], 'milkshake'),
3851 (['http/3.0', 'http/4.0'], None)
3852 ]
3853 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003854 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003855 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003856 client_context.set_alpn_protocols(client_protocols)
3857
3858 try:
3859 stats = server_params_test(client_context,
3860 server_context,
3861 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003862 connectionchatty=True,
3863 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003864 except ssl.SSLError as e:
3865 stats = e
3866
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003867 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003868 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3869 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3870 self.assertIsInstance(stats, ssl.SSLError)
3871 else:
3872 msg = "failed trying %s (s) and %s (c).\n" \
3873 "was expecting %s, but got %%s from the %%s" \
3874 % (str(server_protocols), str(client_protocols),
3875 str(expected))
3876 client_result = stats['client_alpn_protocol']
3877 self.assertEqual(client_result, expected,
3878 msg % (client_result, "client"))
3879 server_result = stats['server_alpn_protocols'][-1] \
3880 if len(stats['server_alpn_protocols']) else 'nothing'
3881 self.assertEqual(server_result, expected,
3882 msg % (server_result, "server"))
3883
3884 def test_selected_npn_protocol(self):
3885 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003886 client_context, server_context, hostname = testing_context()
3887 stats = server_params_test(client_context, server_context,
3888 chatty=True, connectionchatty=True,
3889 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003890 self.assertIs(stats['client_npn_protocol'], None)
3891
3892 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3893 def test_npn_protocols(self):
3894 server_protocols = ['http/1.1', 'spdy/2']
3895 protocol_tests = [
3896 (['http/1.1', 'spdy/2'], 'http/1.1'),
3897 (['spdy/2', 'http/1.1'], 'http/1.1'),
3898 (['spdy/2', 'test'], 'spdy/2'),
3899 (['abc', 'def'], 'abc')
3900 ]
3901 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003902 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003903 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003904 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003905 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003906 chatty=True, connectionchatty=True,
3907 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003908 msg = "failed trying %s (s) and %s (c).\n" \
3909 "was expecting %s, but got %%s from the %%s" \
3910 % (str(server_protocols), str(client_protocols),
3911 str(expected))
3912 client_result = stats['client_npn_protocol']
3913 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3914 server_result = stats['server_npn_protocols'][-1] \
3915 if len(stats['server_npn_protocols']) else 'nothing'
3916 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3917
3918 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003919 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003920 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003921 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003922 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003923 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003924 client_context.load_verify_locations(SIGNING_CA)
3925 return server_context, other_context, client_context
3926
3927 def check_common_name(self, stats, name):
3928 cert = stats['peercert']
3929 self.assertIn((('commonName', name),), cert['subject'])
3930
3931 @needs_sni
3932 def test_sni_callback(self):
3933 calls = []
3934 server_context, other_context, client_context = self.sni_contexts()
3935
Christian Heimesa170fa12017-09-15 20:27:30 +02003936 client_context.check_hostname = False
3937
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003938 def servername_cb(ssl_sock, server_name, initial_context):
3939 calls.append((server_name, initial_context))
3940 if server_name is not None:
3941 ssl_sock.context = other_context
3942 server_context.set_servername_callback(servername_cb)
3943
3944 stats = server_params_test(client_context, server_context,
3945 chatty=True,
3946 sni_name='supermessage')
3947 # The hostname was fetched properly, and the certificate was
3948 # changed for the connection.
3949 self.assertEqual(calls, [("supermessage", server_context)])
3950 # CERTFILE4 was selected
3951 self.check_common_name(stats, 'fakehostname')
3952
3953 calls = []
3954 # The callback is called with server_name=None
3955 stats = server_params_test(client_context, server_context,
3956 chatty=True,
3957 sni_name=None)
3958 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003959 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960
3961 # Check disabling the callback
3962 calls = []
3963 server_context.set_servername_callback(None)
3964
3965 stats = server_params_test(client_context, server_context,
3966 chatty=True,
3967 sni_name='notfunny')
3968 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003969 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003970 self.assertEqual(calls, [])
3971
3972 @needs_sni
3973 def test_sni_callback_alert(self):
3974 # Returning a TLS alert is reflected to the connecting client
3975 server_context, other_context, client_context = self.sni_contexts()
3976
3977 def cb_returning_alert(ssl_sock, server_name, initial_context):
3978 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3979 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003980 with self.assertRaises(ssl.SSLError) as cm:
3981 stats = server_params_test(client_context, server_context,
3982 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003983 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003984 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003985
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003986 @needs_sni
3987 def test_sni_callback_raising(self):
3988 # Raising fails the connection with a TLS handshake failure alert.
3989 server_context, other_context, client_context = self.sni_contexts()
3990
3991 def cb_raising(ssl_sock, server_name, initial_context):
3992 1/0
3993 server_context.set_servername_callback(cb_raising)
3994
3995 with self.assertRaises(ssl.SSLError) as cm, \
3996 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003997 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003998 chatty=False,
3999 sni_name='supermessage')
4000 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4001 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004002
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004003 @needs_sni
4004 def test_sni_callback_wrong_return_type(self):
4005 # Returning the wrong return type terminates the TLS connection
4006 # with an internal error alert.
4007 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004008
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004009 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4010 return "foo"
4011 server_context.set_servername_callback(cb_wrong_return_type)
4012
4013 with self.assertRaises(ssl.SSLError) as cm, \
4014 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004015 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004016 chatty=False,
4017 sni_name='supermessage')
4018 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4019 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004020
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004021 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004022 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004023 client_context.set_ciphers("AES128:AES256")
4024 server_context.set_ciphers("AES256")
4025 expected_algs = [
4026 "AES256", "AES-256",
4027 # TLS 1.3 ciphers are always enabled
4028 "TLS_CHACHA20", "TLS_AES",
4029 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004030
Christian Heimesa170fa12017-09-15 20:27:30 +02004031 stats = server_params_test(client_context, server_context,
4032 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004033 ciphers = stats['server_shared_ciphers'][0]
4034 self.assertGreater(len(ciphers), 0)
4035 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004036 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004037 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004038
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004039 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004040 client_context, server_context, hostname = testing_context()
4041 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004042
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004043 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004044 s = client_context.wrap_socket(socket.socket(),
4045 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004046 s.connect((HOST, server.port))
4047 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004048
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004049 self.assertRaises(ValueError, s.read, 1024)
4050 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004051
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004052 def test_sendfile(self):
4053 TEST_DATA = b"x" * 512
4054 with open(support.TESTFN, 'wb') as f:
4055 f.write(TEST_DATA)
4056 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004057 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004058 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004059 context.load_verify_locations(SIGNING_CA)
4060 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004061 server = ThreadedEchoServer(context=context, chatty=False)
4062 with server:
4063 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004064 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004065 with open(support.TESTFN, 'rb') as file:
4066 s.sendfile(file)
4067 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004068
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004069 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004070 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004071 # TODO: sessions aren't compatible with TLSv1.3 yet
4072 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004073
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004074 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004075 stats = server_params_test(client_context, server_context,
4076 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004077 session = stats['session']
4078 self.assertTrue(session.id)
4079 self.assertGreater(session.time, 0)
4080 self.assertGreater(session.timeout, 0)
4081 self.assertTrue(session.has_ticket)
4082 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4083 self.assertGreater(session.ticket_lifetime_hint, 0)
4084 self.assertFalse(stats['session_reused'])
4085 sess_stat = server_context.session_stats()
4086 self.assertEqual(sess_stat['accept'], 1)
4087 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004088
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004089 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004090 stats = server_params_test(client_context, server_context,
4091 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004092 sess_stat = server_context.session_stats()
4093 self.assertEqual(sess_stat['accept'], 2)
4094 self.assertEqual(sess_stat['hits'], 1)
4095 self.assertTrue(stats['session_reused'])
4096 session2 = stats['session']
4097 self.assertEqual(session2.id, session.id)
4098 self.assertEqual(session2, session)
4099 self.assertIsNot(session2, session)
4100 self.assertGreaterEqual(session2.time, session.time)
4101 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004102
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004103 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004104 stats = server_params_test(client_context, server_context,
4105 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004106 self.assertFalse(stats['session_reused'])
4107 session3 = stats['session']
4108 self.assertNotEqual(session3.id, session.id)
4109 self.assertNotEqual(session3, session)
4110 sess_stat = server_context.session_stats()
4111 self.assertEqual(sess_stat['accept'], 3)
4112 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004113
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004114 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004115 stats = server_params_test(client_context, server_context,
4116 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004117 self.assertTrue(stats['session_reused'])
4118 session4 = stats['session']
4119 self.assertEqual(session4.id, session.id)
4120 self.assertEqual(session4, session)
4121 self.assertGreaterEqual(session4.time, session.time)
4122 self.assertGreaterEqual(session4.timeout, session.timeout)
4123 sess_stat = server_context.session_stats()
4124 self.assertEqual(sess_stat['accept'], 4)
4125 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004126
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004127 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004128 client_context, server_context, hostname = testing_context()
4129 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004130
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004131 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004132 client_context.options |= ssl.OP_NO_TLSv1_3
4133 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004134
Christian Heimesa170fa12017-09-15 20:27:30 +02004135 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004136 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004137 with client_context.wrap_socket(socket.socket(),
4138 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004139 # session is None before handshake
4140 self.assertEqual(s.session, None)
4141 self.assertEqual(s.session_reused, None)
4142 s.connect((HOST, server.port))
4143 session = s.session
4144 self.assertTrue(session)
4145 with self.assertRaises(TypeError) as e:
4146 s.session = object
Miss Islington (bot)42198572018-06-11 17:58:06 -07004147 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004148
Christian Heimesa170fa12017-09-15 20:27:30 +02004149 with client_context.wrap_socket(socket.socket(),
4150 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004151 s.connect((HOST, server.port))
4152 # cannot set session after handshake
4153 with self.assertRaises(ValueError) as e:
4154 s.session = session
4155 self.assertEqual(str(e.exception),
4156 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004157
Christian Heimesa170fa12017-09-15 20:27:30 +02004158 with client_context.wrap_socket(socket.socket(),
4159 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004160 # can set session before handshake and before the
4161 # connection was established
4162 s.session = session
4163 s.connect((HOST, server.port))
4164 self.assertEqual(s.session.id, session.id)
4165 self.assertEqual(s.session, session)
4166 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004167
Christian Heimesa170fa12017-09-15 20:27:30 +02004168 with client_context2.wrap_socket(socket.socket(),
4169 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004170 # cannot re-use session with a different SSLContext
4171 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004172 s.session = session
4173 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004174 self.assertEqual(str(e.exception),
4175 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004176
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004177
Thomas Woutersed03b412007-08-28 21:37:11 +00004178def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004179 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004180 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004181 plats = {
4182 'Linux': platform.linux_distribution,
4183 'Mac': platform.mac_ver,
4184 'Windows': platform.win32_ver,
4185 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004186 with warnings.catch_warnings():
4187 warnings.filterwarnings(
4188 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004189 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004190 'functions are deprecated .*',
Xtreakf5770f32018-07-05 22:29:46 +05304191 DeprecationWarning,
Berker Peksag9e7990a2015-05-16 23:21:26 +03004192 )
4193 for name, func in plats.items():
4194 plat = func()
4195 if plat and plat[0]:
4196 plat = '%s %r' % (name, plat)
4197 break
4198 else:
4199 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004200 print("test_ssl: testing with %r %r" %
4201 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4202 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004203 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004204 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4205 try:
4206 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4207 except AttributeError:
4208 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004209
Antoine Pitrou152efa22010-05-16 18:19:27 +00004210 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004211 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004212 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004213 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004214 BADCERT, BADKEY, EMPTYCERT]:
4215 if not os.path.exists(filename):
4216 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004217
Martin Panter3840b2a2016-03-27 01:53:46 +00004218 tests = [
4219 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004220 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004221 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004222
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004223 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004224 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004225
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004226 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004227 try:
4228 support.run_unittest(*tests)
4229 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004230 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004231
4232if __name__ == "__main__":
4233 test_main()