blob: d4132c5043c7ae0ac5d197cc58b8f43a0563ce7e [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Miss Islington (bot)2614ed42018-02-27 00:17:49 -080033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070064 'notAfter': 'Aug 26 14:23:15 2028 GMT',
65 'notBefore': 'Aug 29 14:23:15 2018 GMT',
66 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010067 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -070089 'notAfter': 'Jul 7 14:23:16 2028 GMT',
90 'notBefore': 'Aug 29 14:23:16 2018 GMT',
91 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010092 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000119
Miss Islington (bot)e3228a32018-08-14 10:52:27 -0400120DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100121BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000122
Christian Heimes358cfd42016-09-10 22:43:48 +0200123# Not defined in all versions of OpenSSL
124OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
125OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
126OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
127OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -0800128OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Thomas Woutersed03b412007-08-28 21:37:11 +0000131def handle_error(prefix):
132 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000133 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000134 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Antoine Pitroub5218772010-05-21 09:56:06 +0000136def can_clear_options():
137 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200138 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000139
140def no_sslv2_implies_sslv3_hello():
141 # 0.9.7h or higher
142 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
143
Christian Heimes2427b502013-11-23 11:24:32 +0100144def have_verify_flags():
145 # 0.9.8 or higher
146 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
147
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -0800148def _have_secp_curves():
149 if not ssl.HAS_ECDH:
150 return False
151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
152 try:
153 ctx.set_ecdh_curve("secp384r1")
154 except ValueError:
155 return False
156 else:
157 return True
158
159
160HAVE_SECP_CURVES = _have_secp_curves()
161
162
Antoine Pitrouc695c952014-04-28 20:57:36 +0200163def utc_offset(): #NOTE: ignore issues like #1647654
164 # local time = utc time + utc offset
165 if time.daylight and time.localtime().tm_isdst > 0:
166 return -time.altzone # seconds
167 return -time.timezone
168
Christian Heimes9424bb42013-06-17 15:32:57 +0200169def asn1time(cert_time):
170 # Some versions of OpenSSL ignore seconds, see #18207
171 # 0.9.8.i
172 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
173 fmt = "%b %d %H:%M:%S %Y GMT"
174 dt = datetime.datetime.strptime(cert_time, fmt)
175 dt = dt.replace(second=0)
176 cert_time = dt.strftime(fmt)
177 # %d adds leading zero but ASN1_TIME_print() uses leading space
178 if cert_time[4] == "0":
179 cert_time = cert_time[:4] + " " + cert_time[5:]
180
181 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000182
Antoine Pitrou23df4832010-08-04 17:14:06 +0000183# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
184def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200185 if hasattr(ssl, 'PROTOCOL_SSLv2'):
186 @functools.wraps(func)
187 def f(*args, **kwargs):
188 try:
189 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
190 except ssl.SSLError:
191 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
192 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
193 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
194 return func(*args, **kwargs)
195 return f
196 else:
197 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000198
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100199needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
200
Antoine Pitrou23df4832010-08-04 17:14:06 +0000201
Christian Heimesd0486372016-09-10 23:23:33 +0200202def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
203 cert_reqs=ssl.CERT_NONE, ca_certs=None,
204 ciphers=None, certfile=None, keyfile=None,
205 **kwargs):
206 context = ssl.SSLContext(ssl_version)
207 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200208 if cert_reqs == ssl.CERT_NONE:
209 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200210 context.verify_mode = cert_reqs
211 if ca_certs is not None:
212 context.load_verify_locations(ca_certs)
213 if certfile is not None or keyfile is not None:
214 context.load_cert_chain(certfile, keyfile)
215 if ciphers is not None:
216 context.set_ciphers(ciphers)
217 return context.wrap_socket(sock, **kwargs)
218
Christian Heimesa170fa12017-09-15 20:27:30 +0200219
220def testing_context(server_cert=SIGNED_CERTFILE):
221 """Create context
222
223 client_context, server_context, hostname = testing_context()
224 """
225 if server_cert == SIGNED_CERTFILE:
226 hostname = SIGNED_CERTFILE_HOSTNAME
227 elif server_cert == SIGNED_CERTFILE2:
228 hostname = SIGNED_CERTFILE2_HOSTNAME
229 else:
230 raise ValueError(server_cert)
231
232 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
233 client_context.load_verify_locations(SIGNING_CA)
234
235 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
236 server_context.load_cert_chain(server_cert)
Christian Heimes2756ef32018-09-23 09:22:52 +0200237 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200238
239 return client_context, server_context, hostname
240
241
Antoine Pitrou152efa22010-05-16 18:19:27 +0000242class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000243
Antoine Pitrou480a1242010-04-28 21:37:09 +0000244 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000245 ssl.CERT_NONE
246 ssl.CERT_OPTIONAL
247 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100248 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100249 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100250 if ssl.HAS_ECDH:
251 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100252 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
253 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000254 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100255 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700256 ssl.OP_NO_SSLv2
257 ssl.OP_NO_SSLv3
258 ssl.OP_NO_TLSv1
259 ssl.OP_NO_TLSv1_3
260 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
261 ssl.OP_NO_TLSv1_1
262 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200263 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000264
Christian Heimes89c20512018-02-27 11:17:32 +0100265 def test_private_init(self):
266 with self.assertRaisesRegex(TypeError, "public constructor"):
267 with socket.socket() as s:
268 ssl.SSLSocket(s)
269
Antoine Pitrou172f0252014-04-18 20:33:08 +0200270 def test_str_for_enums(self):
271 # Make sure that the PROTOCOL_* constants have enum-like string
272 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200273 proto = ssl.PROTOCOL_TLS
274 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200275 ctx = ssl.SSLContext(proto)
276 self.assertIs(ctx.protocol, proto)
277
Antoine Pitrou480a1242010-04-28 21:37:09 +0000278 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000280 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 sys.stdout.write("\n RAND_status is %d (%s)\n"
282 % (v, (v and "sufficient randomness") or
283 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200284
285 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
286 self.assertEqual(len(data), 16)
287 self.assertEqual(is_cryptographic, v == 1)
288 if v:
289 data = ssl.RAND_bytes(16)
290 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200291 else:
292 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200293
Victor Stinner1e81a392013-12-19 16:47:04 +0100294 # negative num is invalid
295 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
296 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
297
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100298 if hasattr(ssl, 'RAND_egd'):
299 self.assertRaises(TypeError, ssl.RAND_egd, 1)
300 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000301 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200302 ssl.RAND_add(b"this is a random bytes object", 75.0)
303 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000304
Christian Heimesf77b4b22013-08-21 13:26:05 +0200305 @unittest.skipUnless(os.name == 'posix', 'requires posix')
306 def test_random_fork(self):
307 status = ssl.RAND_status()
308 if not status:
309 self.fail("OpenSSL's PRNG has insufficient randomness")
310
311 rfd, wfd = os.pipe()
312 pid = os.fork()
313 if pid == 0:
314 try:
315 os.close(rfd)
316 child_random = ssl.RAND_pseudo_bytes(16)[0]
317 self.assertEqual(len(child_random), 16)
318 os.write(wfd, child_random)
319 os.close(wfd)
320 except BaseException:
321 os._exit(1)
322 else:
323 os._exit(0)
324 else:
325 os.close(wfd)
326 self.addCleanup(os.close, rfd)
327 _, status = os.waitpid(pid, 0)
328 self.assertEqual(status, 0)
329
330 child_random = os.read(rfd, 16)
331 self.assertEqual(len(child_random), 16)
332 parent_random = ssl.RAND_pseudo_bytes(16)[0]
333 self.assertEqual(len(parent_random), 16)
334
335 self.assertNotEqual(child_random, parent_random)
336
Miss Islington (bot)2d3f2dc2018-09-06 06:13:24 -0700337 maxDiff = None
338
Antoine Pitrou480a1242010-04-28 21:37:09 +0000339 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000340 # note that this uses an 'unofficial' function in _ssl.c,
341 # provided solely for this test, to exercise the certificate
342 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100343 self.assertEqual(
344 ssl._ssl._test_decode_cert(CERTFILE),
345 CERTFILE_INFO
346 )
347 self.assertEqual(
348 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
349 SIGNED_CERTFILE_INFO
350 )
351
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200352 # Issue #13034: the subjectAltName in some certificates
353 # (notably projects.developer.nokia.com:443) wasn't parsed
354 p = ssl._ssl._test_decode_cert(NOKIACERT)
355 if support.verbose:
356 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
357 self.assertEqual(p['subjectAltName'],
358 (('DNS', 'projects.developer.nokia.com'),
359 ('DNS', 'projects.forum.nokia.com'))
360 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100361 # extra OCSP and AIA fields
362 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
363 self.assertEqual(p['caIssuers'],
364 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
365 self.assertEqual(p['crlDistributionPoints'],
366 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000367
Christian Heimes824f7f32013-08-17 00:54:47 +0200368 def test_parse_cert_CVE_2013_4238(self):
369 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
370 if support.verbose:
371 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
372 subject = ((('countryName', 'US'),),
373 (('stateOrProvinceName', 'Oregon'),),
374 (('localityName', 'Beaverton'),),
375 (('organizationName', 'Python Software Foundation'),),
376 (('organizationalUnitName', 'Python Core Development'),),
377 (('commonName', 'null.python.org\x00example.org'),),
378 (('emailAddress', 'python-dev@python.org'),))
379 self.assertEqual(p['subject'], subject)
380 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200381 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
382 san = (('DNS', 'altnull.python.org\x00example.com'),
383 ('email', 'null@python.org\x00user@example.org'),
384 ('URI', 'http://null.python.org\x00http://example.org'),
385 ('IP Address', '192.0.2.1'),
386 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
387 else:
388 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
389 san = (('DNS', 'altnull.python.org\x00example.com'),
390 ('email', 'null@python.org\x00user@example.org'),
391 ('URI', 'http://null.python.org\x00http://example.org'),
392 ('IP Address', '192.0.2.1'),
393 ('IP Address', '<invalid>'))
394
395 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200396
Christian Heimes1c03abd2016-09-06 23:25:35 +0200397 def test_parse_all_sans(self):
398 p = ssl._ssl._test_decode_cert(ALLSANFILE)
399 self.assertEqual(p['subjectAltName'],
400 (
401 ('DNS', 'allsans'),
402 ('othername', '<unsupported>'),
403 ('othername', '<unsupported>'),
404 ('email', 'user@example.org'),
405 ('DNS', 'www.example.org'),
406 ('DirName',
407 ((('countryName', 'XY'),),
408 (('localityName', 'Castle Anthrax'),),
409 (('organizationName', 'Python Software Foundation'),),
410 (('commonName', 'dirname example'),))),
411 ('URI', 'https://www.python.org/'),
412 ('IP Address', '127.0.0.1'),
413 ('IP Address', '0:0:0:0:0:0:0:1\n'),
414 ('Registered ID', '1.2.3.4.5')
415 )
416 )
417
Antoine Pitrou480a1242010-04-28 21:37:09 +0000418 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000419 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000420 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000421 d1 = ssl.PEM_cert_to_DER_cert(pem)
422 p2 = ssl.DER_cert_to_PEM_cert(d1)
423 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000424 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000425 if not p2.startswith(ssl.PEM_HEADER + '\n'):
426 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
427 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
428 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000429
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000430 def test_openssl_version(self):
431 n = ssl.OPENSSL_VERSION_NUMBER
432 t = ssl.OPENSSL_VERSION_INFO
433 s = ssl.OPENSSL_VERSION
434 self.assertIsInstance(n, int)
435 self.assertIsInstance(t, tuple)
436 self.assertIsInstance(s, str)
437 # Some sanity checks follow
438 # >= 0.9
439 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400440 # < 3.0
441 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000442 major, minor, fix, patch, status = t
443 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400444 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000445 self.assertGreaterEqual(minor, 0)
446 self.assertLess(minor, 256)
447 self.assertGreaterEqual(fix, 0)
448 self.assertLess(fix, 256)
449 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100450 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000451 self.assertGreaterEqual(status, 0)
452 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400453 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200454 if IS_LIBRESSL:
455 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100456 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400457 else:
458 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100459 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000460
Antoine Pitrou9d543662010-04-23 23:10:32 +0000461 @support.cpython_only
462 def test_refcycle(self):
463 # Issue #7943: an SSL object doesn't create reference cycles with
464 # itself.
465 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200466 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000467 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100468 with support.check_warnings(("", ResourceWarning)):
469 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100470 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000471
Antoine Pitroua468adc2010-09-14 14:43:44 +0000472 def test_wrapped_unconnected(self):
473 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200474 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000475 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200476 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100477 self.assertRaises(OSError, ss.recv, 1)
478 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
479 self.assertRaises(OSError, ss.recvfrom, 1)
480 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
481 self.assertRaises(OSError, ss.send, b'x')
482 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800483 self.assertRaises(NotImplementedError, ss.sendmsg,
484 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000485
Antoine Pitrou40f08742010-04-24 22:04:40 +0000486 def test_timeout(self):
487 # Issue #8524: when creating an SSL socket, the timeout of the
488 # original socket should be retained.
489 for timeout in (None, 0.0, 5.0):
490 s = socket.socket(socket.AF_INET)
491 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200492 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100493 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000494
Christian Heimesd0486372016-09-10 23:23:33 +0200495 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000496 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000497 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000498 "certfile must be specified",
499 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000500 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000501 "certfile must be specified for server-side operations",
502 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000503 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000504 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200505 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100506 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
507 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200508 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200509 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000510 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000511 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000512 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200513 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000514 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000515 ssl.wrap_socket(sock,
516 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000517 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200518 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000519 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000520 ssl.wrap_socket(sock,
521 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000522 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000523
Martin Panter3464ea22016-02-01 21:58:11 +0000524 def bad_cert_test(self, certfile):
525 """Check that trying to use the given client certificate fails"""
526 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
527 certfile)
528 sock = socket.socket()
529 self.addCleanup(sock.close)
530 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200531 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200532 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000533
534 def test_empty_cert(self):
535 """Wrapping with an empty cert file"""
536 self.bad_cert_test("nullcert.pem")
537
538 def test_malformed_cert(self):
539 """Wrapping with a badly formatted certificate (syntax error)"""
540 self.bad_cert_test("badcert.pem")
541
542 def test_malformed_key(self):
543 """Wrapping with a badly formatted key (syntax error)"""
544 self.bad_cert_test("badkey.pem")
545
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000546 def test_match_hostname(self):
547 def ok(cert, hostname):
548 ssl.match_hostname(cert, hostname)
549 def fail(cert, hostname):
550 self.assertRaises(ssl.CertificateError,
551 ssl.match_hostname, cert, hostname)
552
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100553 # -- Hostname matching --
554
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 cert = {'subject': ((('commonName', 'example.com'),),)}
556 ok(cert, 'example.com')
557 ok(cert, 'ExAmple.cOm')
558 fail(cert, 'www.example.com')
559 fail(cert, '.example.com')
560 fail(cert, 'example.org')
561 fail(cert, 'exampleXcom')
562
563 cert = {'subject': ((('commonName', '*.a.com'),),)}
564 ok(cert, 'foo.a.com')
565 fail(cert, 'bar.foo.a.com')
566 fail(cert, 'a.com')
567 fail(cert, 'Xa.com')
568 fail(cert, '.a.com')
569
Mandeep Singhede2ac92017-11-27 04:01:27 +0530570 # only match wildcards when they are the only thing
571 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000572 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530573 fail(cert, 'foo.com')
574 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000575 fail(cert, 'bar.com')
576 fail(cert, 'foo.a.com')
577 fail(cert, 'bar.foo.com')
578
Christian Heimes824f7f32013-08-17 00:54:47 +0200579 # NULL bytes are bad, CVE-2013-4073
580 cert = {'subject': ((('commonName',
581 'null.python.org\x00example.org'),),)}
582 ok(cert, 'null.python.org\x00example.org') # or raise an error?
583 fail(cert, 'example.org')
584 fail(cert, 'null.python.org')
585
Georg Brandl72c98d32013-10-27 07:16:53 +0100586 # error cases with wildcards
587 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
588 fail(cert, 'bar.foo.a.com')
589 fail(cert, 'a.com')
590 fail(cert, 'Xa.com')
591 fail(cert, '.a.com')
592
593 cert = {'subject': ((('commonName', 'a.*.com'),),)}
594 fail(cert, 'a.foo.com')
595 fail(cert, 'a..com')
596 fail(cert, 'a.com')
597
598 # wildcard doesn't match IDNA prefix 'xn--'
599 idna = 'püthon.python.org'.encode("idna").decode("ascii")
600 cert = {'subject': ((('commonName', idna),),)}
601 ok(cert, idna)
602 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
603 fail(cert, idna)
604 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
605 fail(cert, idna)
606
607 # wildcard in first fragment and IDNA A-labels in sequent fragments
608 # are supported.
609 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
610 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530611 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
612 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100613 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
614 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
615
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000616 # Slightly fake real-world example
617 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
618 'subject': ((('commonName', 'linuxfrz.org'),),),
619 'subjectAltName': (('DNS', 'linuxfr.org'),
620 ('DNS', 'linuxfr.com'),
621 ('othername', '<unsupported>'))}
622 ok(cert, 'linuxfr.org')
623 ok(cert, 'linuxfr.com')
624 # Not a "DNS" entry
625 fail(cert, '<unsupported>')
626 # When there is a subjectAltName, commonName isn't used
627 fail(cert, 'linuxfrz.org')
628
629 # A pristine real-world example
630 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
631 'subject': ((('countryName', 'US'),),
632 (('stateOrProvinceName', 'California'),),
633 (('localityName', 'Mountain View'),),
634 (('organizationName', 'Google Inc'),),
635 (('commonName', 'mail.google.com'),))}
636 ok(cert, 'mail.google.com')
637 fail(cert, 'gmail.com')
638 # Only commonName is considered
639 fail(cert, 'California')
640
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100641 # -- IPv4 matching --
642 cert = {'subject': ((('commonName', 'example.com'),),),
643 'subjectAltName': (('DNS', 'example.com'),
644 ('IP Address', '10.11.12.13'),
645 ('IP Address', '14.15.16.17'))}
646 ok(cert, '10.11.12.13')
647 ok(cert, '14.15.16.17')
648 fail(cert, '14.15.16.18')
649 fail(cert, 'example.net')
650
651 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800652 if hasattr(socket, 'AF_INET6'):
653 cert = {'subject': ((('commonName', 'example.com'),),),
654 'subjectAltName': (
655 ('DNS', 'example.com'),
656 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
657 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
658 ok(cert, '2001::cafe')
659 ok(cert, '2003::baba')
660 fail(cert, '2003::bebe')
661 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100662
663 # -- Miscellaneous --
664
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000665 # Neither commonName nor subjectAltName
666 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
667 'subject': ((('countryName', 'US'),),
668 (('stateOrProvinceName', 'California'),),
669 (('localityName', 'Mountain View'),),
670 (('organizationName', 'Google Inc'),))}
671 fail(cert, 'mail.google.com')
672
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200673 # No DNS entry in subjectAltName but a commonName
674 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
675 'subject': ((('countryName', 'US'),),
676 (('stateOrProvinceName', 'California'),),
677 (('localityName', 'Mountain View'),),
678 (('commonName', 'mail.google.com'),)),
679 'subjectAltName': (('othername', 'blabla'), )}
680 ok(cert, 'mail.google.com')
681
682 # No DNS entry subjectAltName and no commonName
683 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
684 'subject': ((('countryName', 'US'),),
685 (('stateOrProvinceName', 'California'),),
686 (('localityName', 'Mountain View'),),
687 (('organizationName', 'Google Inc'),)),
688 'subjectAltName': (('othername', 'blabla'),)}
689 fail(cert, 'google.com')
690
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000691 # Empty cert / no cert
692 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
693 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
694
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200695 # Issue #17980: avoid denials of service by refusing more than one
696 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800697 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
698 with self.assertRaisesRegex(
699 ssl.CertificateError,
700 "partial wildcards in leftmost label are not supported"):
701 ssl.match_hostname(cert, 'axxb.example.com')
702
703 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
704 with self.assertRaisesRegex(
705 ssl.CertificateError,
706 "wildcard can only be present in the leftmost label"):
707 ssl.match_hostname(cert, 'www.sub.example.com')
708
709 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
710 with self.assertRaisesRegex(
711 ssl.CertificateError,
712 "too many wildcards"):
713 ssl.match_hostname(cert, 'axxbxxc.example.com')
714
715 cert = {'subject': ((('commonName', '*'),),)}
716 with self.assertRaisesRegex(
717 ssl.CertificateError,
718 "sole wildcard without additional labels are not support"):
719 ssl.match_hostname(cert, 'host')
720
721 cert = {'subject': ((('commonName', '*.com'),),)}
722 with self.assertRaisesRegex(
723 ssl.CertificateError,
724 r"hostname 'com' doesn't match '\*.com'"):
725 ssl.match_hostname(cert, 'com')
726
727 # extra checks for _inet_paton()
728 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
729 with self.assertRaises(ValueError):
730 ssl._inet_paton(invalid)
731 for ipaddr in ['127.0.0.1', '192.168.0.1']:
732 self.assertTrue(ssl._inet_paton(ipaddr))
733 if hasattr(socket, 'AF_INET6'):
734 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
735 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200736
Antoine Pitroud5323212010-10-22 18:19:07 +0000737 def test_server_side(self):
738 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200739 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000740 with socket.socket() as sock:
741 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
742 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000743
Antoine Pitroud6494802011-07-21 01:11:30 +0200744 def test_unknown_channel_binding(self):
745 # should raise ValueError for unknown type
746 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200747 s.bind(('127.0.0.1', 0))
748 s.listen()
749 c = socket.socket(socket.AF_INET)
750 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200751 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100752 with self.assertRaises(ValueError):
753 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200754 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200755
756 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
757 "'tls-unique' channel binding not available")
758 def test_tls_unique_channel_binding(self):
759 # unconnected should return None for known type
760 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200761 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100762 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200763 # the same for server-side
764 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200765 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100766 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200767
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600768 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200769 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600770 r = repr(ss)
771 with self.assertWarns(ResourceWarning) as cm:
772 ss = None
773 support.gc_collect()
774 self.assertIn(r, str(cm.warning.args[0]))
775
Christian Heimes6d7ad132013-06-09 18:02:55 +0200776 def test_get_default_verify_paths(self):
777 paths = ssl.get_default_verify_paths()
778 self.assertEqual(len(paths), 6)
779 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
780
781 with support.EnvironmentVarGuard() as env:
782 env["SSL_CERT_DIR"] = CAPATH
783 env["SSL_CERT_FILE"] = CERTFILE
784 paths = ssl.get_default_verify_paths()
785 self.assertEqual(paths.cafile, CERTFILE)
786 self.assertEqual(paths.capath, CAPATH)
787
Christian Heimes44109d72013-11-22 01:51:30 +0100788 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
789 def test_enum_certificates(self):
790 self.assertTrue(ssl.enum_certificates("CA"))
791 self.assertTrue(ssl.enum_certificates("ROOT"))
792
793 self.assertRaises(TypeError, ssl.enum_certificates)
794 self.assertRaises(WindowsError, ssl.enum_certificates, "")
795
Christian Heimesc2d65e12013-11-22 16:13:55 +0100796 trust_oids = set()
797 for storename in ("CA", "ROOT"):
798 store = ssl.enum_certificates(storename)
799 self.assertIsInstance(store, list)
800 for element in store:
801 self.assertIsInstance(element, tuple)
802 self.assertEqual(len(element), 3)
803 cert, enc, trust = element
804 self.assertIsInstance(cert, bytes)
805 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
806 self.assertIsInstance(trust, (set, bool))
807 if isinstance(trust, set):
808 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100809
810 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100811 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200812
Christian Heimes46bebee2013-06-09 19:03:31 +0200813 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100814 def test_enum_crls(self):
815 self.assertTrue(ssl.enum_crls("CA"))
816 self.assertRaises(TypeError, ssl.enum_crls)
817 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200818
Christian Heimes44109d72013-11-22 01:51:30 +0100819 crls = ssl.enum_crls("CA")
820 self.assertIsInstance(crls, list)
821 for element in crls:
822 self.assertIsInstance(element, tuple)
823 self.assertEqual(len(element), 2)
824 self.assertIsInstance(element[0], bytes)
825 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200826
Christian Heimes46bebee2013-06-09 19:03:31 +0200827
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100828 def test_asn1object(self):
829 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
830 '1.3.6.1.5.5.7.3.1')
831
832 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
833 self.assertEqual(val, expected)
834 self.assertEqual(val.nid, 129)
835 self.assertEqual(val.shortname, 'serverAuth')
836 self.assertEqual(val.longname, 'TLS Web Server Authentication')
837 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
838 self.assertIsInstance(val, ssl._ASN1Object)
839 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
840
841 val = ssl._ASN1Object.fromnid(129)
842 self.assertEqual(val, expected)
843 self.assertIsInstance(val, ssl._ASN1Object)
844 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100845 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
846 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100847 for i in range(1000):
848 try:
849 obj = ssl._ASN1Object.fromnid(i)
850 except ValueError:
851 pass
852 else:
853 self.assertIsInstance(obj.nid, int)
854 self.assertIsInstance(obj.shortname, str)
855 self.assertIsInstance(obj.longname, str)
856 self.assertIsInstance(obj.oid, (str, type(None)))
857
858 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
859 self.assertEqual(val, expected)
860 self.assertIsInstance(val, ssl._ASN1Object)
861 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
862 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
863 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100864 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
865 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100866
Christian Heimes72d28502013-11-23 13:56:58 +0100867 def test_purpose_enum(self):
868 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
869 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
870 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
871 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
872 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
873 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
874 '1.3.6.1.5.5.7.3.1')
875
876 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
877 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
878 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
879 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
880 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
881 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
882 '1.3.6.1.5.5.7.3.2')
883
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100884 def test_unsupported_dtls(self):
885 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
886 self.addCleanup(s.close)
887 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200888 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100889 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200890 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100891 with self.assertRaises(NotImplementedError) as cx:
892 ctx.wrap_socket(s)
893 self.assertEqual(str(cx.exception), "only stream sockets are supported")
894
Antoine Pitrouc695c952014-04-28 20:57:36 +0200895 def cert_time_ok(self, timestring, timestamp):
896 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
897
898 def cert_time_fail(self, timestring):
899 with self.assertRaises(ValueError):
900 ssl.cert_time_to_seconds(timestring)
901
902 @unittest.skipUnless(utc_offset(),
903 'local time needs to be different from UTC')
904 def test_cert_time_to_seconds_timezone(self):
905 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
906 # results if local timezone is not UTC
907 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
908 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
909
910 def test_cert_time_to_seconds(self):
911 timestring = "Jan 5 09:34:43 2018 GMT"
912 ts = 1515144883.0
913 self.cert_time_ok(timestring, ts)
914 # accept keyword parameter, assert its name
915 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
916 # accept both %e and %d (space or zero generated by strftime)
917 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
918 # case-insensitive
919 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
920 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
921 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
922 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
923 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
924 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
925 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
926 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
927
928 newyear_ts = 1230768000.0
929 # leap seconds
930 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
931 # same timestamp
932 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
933
934 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
935 # allow 60th second (even if it is not a leap second)
936 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
937 # allow 2nd leap second for compatibility with time.strptime()
938 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
939 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
940
Mike53f7a7c2017-12-14 14:04:53 +0300941 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200942 # 99991231235959Z (rfc 5280)
943 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
944
945 @support.run_with_locale('LC_ALL', '')
946 def test_cert_time_to_seconds_locale(self):
947 # `cert_time_to_seconds()` should be locale independent
948
949 def local_february_name():
950 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
951
952 if local_february_name().lower() == 'feb':
953 self.skipTest("locale-specific month name needs to be "
954 "different from C locale")
955
956 # locale-independent
957 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
958 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
959
Martin Panter3840b2a2016-03-27 01:53:46 +0000960 def test_connect_ex_error(self):
961 server = socket.socket(socket.AF_INET)
962 self.addCleanup(server.close)
963 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200964 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000965 cert_reqs=ssl.CERT_REQUIRED)
966 self.addCleanup(s.close)
967 rc = s.connect_ex((HOST, port))
968 # Issue #19919: Windows machines or VMs hosted on Windows
969 # machines sometimes return EWOULDBLOCK.
970 errors = (
971 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
972 errno.EWOULDBLOCK,
973 )
974 self.assertIn(rc, errors)
975
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100976
Antoine Pitrou152efa22010-05-16 18:19:27 +0000977class ContextTests(unittest.TestCase):
978
Antoine Pitrou23df4832010-08-04 17:14:06 +0000979 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000980 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100981 for protocol in PROTOCOLS:
982 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200983 ctx = ssl.SSLContext()
984 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000985 self.assertRaises(ValueError, ssl.SSLContext, -1)
986 self.assertRaises(ValueError, ssl.SSLContext, 42)
987
Antoine Pitrou23df4832010-08-04 17:14:06 +0000988 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000989 def test_protocol(self):
990 for proto in PROTOCOLS:
991 ctx = ssl.SSLContext(proto)
992 self.assertEqual(ctx.protocol, proto)
993
994 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200995 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000996 ctx.set_ciphers("ALL")
997 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000998 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000999 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001000
Christian Heimes892d66e2018-01-29 14:10:18 +01001001 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1002 "Test applies only to Python default ciphers")
1003 def test_python_ciphers(self):
1004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1005 ciphers = ctx.get_ciphers()
1006 for suite in ciphers:
1007 name = suite['name']
1008 self.assertNotIn("PSK", name)
1009 self.assertNotIn("SRP", name)
1010 self.assertNotIn("MD5", name)
1011 self.assertNotIn("RC4", name)
1012 self.assertNotIn("3DES", name)
1013
Christian Heimes25bfcd52016-09-06 00:04:45 +02001014 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1015 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001016 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001017 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001018 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001019 self.assertIn('AES256-GCM-SHA384', names)
1020 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001021
Antoine Pitrou23df4832010-08-04 17:14:06 +00001022 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +00001023 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001025 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001026 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001027 # SSLContext also enables these by default
1028 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08001029 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1030 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001031 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001032 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001033 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001034 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001035 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1036 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001037 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001038 # Ubuntu has OP_NO_SSLv3 forced on by default
1039 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001040 else:
1041 with self.assertRaises(ValueError):
1042 ctx.options = 0
1043
Christian Heimesa170fa12017-09-15 20:27:30 +02001044 def test_verify_mode_protocol(self):
1045 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001046 # Default value
1047 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1048 ctx.verify_mode = ssl.CERT_OPTIONAL
1049 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1050 ctx.verify_mode = ssl.CERT_REQUIRED
1051 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1052 ctx.verify_mode = ssl.CERT_NONE
1053 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1054 with self.assertRaises(TypeError):
1055 ctx.verify_mode = None
1056 with self.assertRaises(ValueError):
1057 ctx.verify_mode = 42
1058
Christian Heimesa170fa12017-09-15 20:27:30 +02001059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1060 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1061 self.assertFalse(ctx.check_hostname)
1062
1063 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1064 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1065 self.assertTrue(ctx.check_hostname)
1066
Christian Heimes61d478c2018-01-27 15:51:38 +01001067 def test_hostname_checks_common_name(self):
1068 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1069 self.assertTrue(ctx.hostname_checks_common_name)
1070 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1071 ctx.hostname_checks_common_name = True
1072 self.assertTrue(ctx.hostname_checks_common_name)
1073 ctx.hostname_checks_common_name = False
1074 self.assertFalse(ctx.hostname_checks_common_name)
1075 ctx.hostname_checks_common_name = True
1076 self.assertTrue(ctx.hostname_checks_common_name)
1077 else:
1078 with self.assertRaises(AttributeError):
1079 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001080
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08001081 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1082 "required OpenSSL 1.1.0g")
1083 def test_min_max_version(self):
1084 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1085 self.assertEqual(
1086 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1087 )
1088 self.assertEqual(
1089 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1090 )
1091
1092 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1093 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1094 self.assertEqual(
1095 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1096 )
1097 self.assertEqual(
1098 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1099 )
1100
1101 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1102 ctx.maximum_version = ssl.TLSVersion.TLSv1
1103 self.assertEqual(
1104 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1105 )
1106 self.assertEqual(
1107 ctx.maximum_version, ssl.TLSVersion.TLSv1
1108 )
1109
1110 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1111 self.assertEqual(
1112 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1113 )
1114
1115 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1116 self.assertIn(
1117 ctx.maximum_version,
1118 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1119 )
1120
1121 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1122 self.assertIn(
1123 ctx.minimum_version,
1124 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1125 )
1126
1127 with self.assertRaises(ValueError):
1128 ctx.minimum_version = 42
1129
1130 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1131
1132 self.assertEqual(
1133 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1134 )
1135 self.assertEqual(
1136 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 )
1138 with self.assertRaises(ValueError):
1139 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1140 with self.assertRaises(ValueError):
1141 ctx.maximum_version = ssl.TLSVersion.TLSv1
1142
1143
Christian Heimes2427b502013-11-23 11:24:32 +01001144 @unittest.skipUnless(have_verify_flags(),
1145 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001146 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001147 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001148 # default value
1149 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1150 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001151 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1152 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1153 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1154 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1155 ctx.verify_flags = ssl.VERIFY_DEFAULT
1156 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1157 # supports any value
1158 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1159 self.assertEqual(ctx.verify_flags,
1160 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1161 with self.assertRaises(TypeError):
1162 ctx.verify_flags = None
1163
Antoine Pitrou152efa22010-05-16 18:19:27 +00001164 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001165 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001166 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001167 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001168 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1169 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001170 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001171 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001172 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001173 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001174 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001175 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001176 ctx.load_cert_chain(EMPTYCERT)
1177 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001178 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001179 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1180 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1181 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001182 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001183 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001184 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001185 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001186 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001187 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1188 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001189 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001190 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001191 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001192 # Password protected key and cert
1193 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1194 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1195 ctx.load_cert_chain(CERTFILE_PROTECTED,
1196 password=bytearray(KEY_PASSWORD.encode()))
1197 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1198 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1199 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1200 bytearray(KEY_PASSWORD.encode()))
1201 with self.assertRaisesRegex(TypeError, "should be a string"):
1202 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1203 with self.assertRaises(ssl.SSLError):
1204 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1205 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1206 # openssl has a fixed limit on the password buffer.
1207 # PEM_BUFSIZE is generally set to 1kb.
1208 # Return a string larger than this.
1209 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1210 # Password callback
1211 def getpass_unicode():
1212 return KEY_PASSWORD
1213 def getpass_bytes():
1214 return KEY_PASSWORD.encode()
1215 def getpass_bytearray():
1216 return bytearray(KEY_PASSWORD.encode())
1217 def getpass_badpass():
1218 return "badpass"
1219 def getpass_huge():
1220 return b'a' * (1024 * 1024)
1221 def getpass_bad_type():
1222 return 9
1223 def getpass_exception():
1224 raise Exception('getpass error')
1225 class GetPassCallable:
1226 def __call__(self):
1227 return KEY_PASSWORD
1228 def getpass(self):
1229 return KEY_PASSWORD
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1232 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1233 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1234 ctx.load_cert_chain(CERTFILE_PROTECTED,
1235 password=GetPassCallable().getpass)
1236 with self.assertRaises(ssl.SSLError):
1237 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1238 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1239 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1240 with self.assertRaisesRegex(TypeError, "must return a string"):
1241 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1242 with self.assertRaisesRegex(Exception, "getpass error"):
1243 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1244 # Make sure the password function isn't called if it isn't needed
1245 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001246
1247 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001248 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001249 ctx.load_verify_locations(CERTFILE)
1250 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1251 ctx.load_verify_locations(BYTES_CERTFILE)
1252 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1253 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001254 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001255 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001256 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001257 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001258 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001259 ctx.load_verify_locations(BADCERT)
1260 ctx.load_verify_locations(CERTFILE, CAPATH)
1261 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1262
Victor Stinner80f75e62011-01-29 11:31:20 +00001263 # Issue #10989: crash if the second argument type is invalid
1264 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1265
Christian Heimesefff7062013-11-21 03:35:02 +01001266 def test_load_verify_cadata(self):
1267 # test cadata
1268 with open(CAFILE_CACERT) as f:
1269 cacert_pem = f.read()
1270 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1271 with open(CAFILE_NEURONIO) as f:
1272 neuronio_pem = f.read()
1273 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1274
1275 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001276 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001277 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1278 ctx.load_verify_locations(cadata=cacert_pem)
1279 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1280 ctx.load_verify_locations(cadata=neuronio_pem)
1281 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1282 # cert already in hash table
1283 ctx.load_verify_locations(cadata=neuronio_pem)
1284 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1285
1286 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001287 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001288 combined = "\n".join((cacert_pem, neuronio_pem))
1289 ctx.load_verify_locations(cadata=combined)
1290 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1291
1292 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001293 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001294 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1295 neuronio_pem, "tail"]
1296 ctx.load_verify_locations(cadata="\n".join(combined))
1297 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1298
1299 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001300 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001301 ctx.load_verify_locations(cadata=cacert_der)
1302 ctx.load_verify_locations(cadata=neuronio_der)
1303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1304 # cert already in hash table
1305 ctx.load_verify_locations(cadata=cacert_der)
1306 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1307
1308 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001310 combined = b"".join((cacert_der, neuronio_der))
1311 ctx.load_verify_locations(cadata=combined)
1312 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1313
1314 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001315 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001316 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1317
1318 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1319 ctx.load_verify_locations(cadata="broken")
1320 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1321 ctx.load_verify_locations(cadata=b"broken")
1322
1323
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001324 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001325 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001326 ctx.load_dh_params(DHFILE)
1327 if os.name != 'nt':
1328 ctx.load_dh_params(BYTES_DHFILE)
1329 self.assertRaises(TypeError, ctx.load_dh_params)
1330 self.assertRaises(TypeError, ctx.load_dh_params, None)
1331 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001332 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001333 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001334 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001335 ctx.load_dh_params(CERTFILE)
1336
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001337 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001338 def test_session_stats(self):
1339 for proto in PROTOCOLS:
1340 ctx = ssl.SSLContext(proto)
1341 self.assertEqual(ctx.session_stats(), {
1342 'number': 0,
1343 'connect': 0,
1344 'connect_good': 0,
1345 'connect_renegotiate': 0,
1346 'accept': 0,
1347 'accept_good': 0,
1348 'accept_renegotiate': 0,
1349 'hits': 0,
1350 'misses': 0,
1351 'timeouts': 0,
1352 'cache_full': 0,
1353 })
1354
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001355 def test_set_default_verify_paths(self):
1356 # There's not much we can do to test that it acts as expected,
1357 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001358 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001359 ctx.set_default_verify_paths()
1360
Antoine Pitrou501da612011-12-21 09:27:41 +01001361 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001362 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001363 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001364 ctx.set_ecdh_curve("prime256v1")
1365 ctx.set_ecdh_curve(b"prime256v1")
1366 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1367 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1368 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1369 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1370
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001371 @needs_sni
1372 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001373 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001374
1375 # set_servername_callback expects a callable, or None
1376 self.assertRaises(TypeError, ctx.set_servername_callback)
1377 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1378 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1379 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1380
1381 def dummycallback(sock, servername, ctx):
1382 pass
1383 ctx.set_servername_callback(None)
1384 ctx.set_servername_callback(dummycallback)
1385
1386 @needs_sni
1387 def test_sni_callback_refcycle(self):
1388 # Reference cycles through the servername callback are detected
1389 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001390 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001391 def dummycallback(sock, servername, ctx, cycle=ctx):
1392 pass
1393 ctx.set_servername_callback(dummycallback)
1394 wr = weakref.ref(ctx)
1395 del ctx, dummycallback
1396 gc.collect()
1397 self.assertIs(wr(), None)
1398
Christian Heimes9a5395a2013-06-17 15:44:12 +02001399 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001400 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001401 self.assertEqual(ctx.cert_store_stats(),
1402 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1403 ctx.load_cert_chain(CERTFILE)
1404 self.assertEqual(ctx.cert_store_stats(),
1405 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1406 ctx.load_verify_locations(CERTFILE)
1407 self.assertEqual(ctx.cert_store_stats(),
1408 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001409 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001410 self.assertEqual(ctx.cert_store_stats(),
1411 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1412
1413 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001414 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001415 self.assertEqual(ctx.get_ca_certs(), [])
1416 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1417 ctx.load_verify_locations(CERTFILE)
1418 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001419 # but CAFILE_CACERT is a CA cert
1420 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001421 self.assertEqual(ctx.get_ca_certs(),
1422 [{'issuer': ((('organizationName', 'Root CA'),),
1423 (('organizationalUnitName', 'http://www.cacert.org'),),
1424 (('commonName', 'CA Cert Signing Authority'),),
1425 (('emailAddress', 'support@cacert.org'),)),
1426 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1427 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1428 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001429 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001430 'subject': ((('organizationName', 'Root CA'),),
1431 (('organizationalUnitName', 'http://www.cacert.org'),),
1432 (('commonName', 'CA Cert Signing Authority'),),
1433 (('emailAddress', 'support@cacert.org'),)),
1434 'version': 3}])
1435
Martin Panterb55f8b72016-01-14 12:53:56 +00001436 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001437 pem = f.read()
1438 der = ssl.PEM_cert_to_DER_cert(pem)
1439 self.assertEqual(ctx.get_ca_certs(True), [der])
1440
Christian Heimes72d28502013-11-23 13:56:58 +01001441 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001442 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001443 ctx.load_default_certs()
1444
Christian Heimesa170fa12017-09-15 20:27:30 +02001445 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001446 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1447 ctx.load_default_certs()
1448
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001450 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1451
Christian Heimesa170fa12017-09-15 20:27:30 +02001452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001453 self.assertRaises(TypeError, ctx.load_default_certs, None)
1454 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1455
Benjamin Peterson91244e02014-10-03 18:17:15 -04001456 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001457 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001458 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001459 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001460 with support.EnvironmentVarGuard() as env:
1461 env["SSL_CERT_DIR"] = CAPATH
1462 env["SSL_CERT_FILE"] = CERTFILE
1463 ctx.load_default_certs()
1464 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1465
Benjamin Peterson91244e02014-10-03 18:17:15 -04001466 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001467 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001468 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001469 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001470 ctx.load_default_certs()
1471 stats = ctx.cert_store_stats()
1472
Christian Heimesa170fa12017-09-15 20:27:30 +02001473 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001474 with support.EnvironmentVarGuard() as env:
1475 env["SSL_CERT_DIR"] = CAPATH
1476 env["SSL_CERT_FILE"] = CERTFILE
1477 ctx.load_default_certs()
1478 stats["x509"] += 1
1479 self.assertEqual(ctx.cert_store_stats(), stats)
1480
Christian Heimes358cfd42016-09-10 22:43:48 +02001481 def _assert_context_options(self, ctx):
1482 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1483 if OP_NO_COMPRESSION != 0:
1484 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1485 OP_NO_COMPRESSION)
1486 if OP_SINGLE_DH_USE != 0:
1487 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1488 OP_SINGLE_DH_USE)
1489 if OP_SINGLE_ECDH_USE != 0:
1490 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1491 OP_SINGLE_ECDH_USE)
1492 if OP_CIPHER_SERVER_PREFERENCE != 0:
1493 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1494 OP_CIPHER_SERVER_PREFERENCE)
1495
Christian Heimes4c05b472013-11-23 15:58:30 +01001496 def test_create_default_context(self):
1497 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001498
Christian Heimesa170fa12017-09-15 20:27:30 +02001499 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001500 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001501 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001502 self._assert_context_options(ctx)
1503
Christian Heimes4c05b472013-11-23 15:58:30 +01001504 with open(SIGNING_CA) as f:
1505 cadata = f.read()
1506 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1507 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001508 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001509 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001510 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001511
1512 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001513 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001514 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001515 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001516
Christian Heimes67986f92013-11-23 22:43:47 +01001517 def test__create_stdlib_context(self):
1518 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001519 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001520 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001521 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001522 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001523
1524 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1525 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1526 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001527 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001528
1529 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001530 cert_reqs=ssl.CERT_REQUIRED,
1531 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001532 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1533 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001534 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001535 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001536
1537 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001539 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001540 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001541
Christian Heimes1aa9a752013-12-02 02:41:19 +01001542 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001543 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001544 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001545 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001546
Christian Heimese82c0342017-09-15 20:29:57 +02001547 # Auto set CERT_REQUIRED
1548 ctx.check_hostname = True
1549 self.assertTrue(ctx.check_hostname)
1550 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1551 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001552 ctx.verify_mode = ssl.CERT_REQUIRED
1553 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001554 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001555
Christian Heimese82c0342017-09-15 20:29:57 +02001556 # Changing verify_mode does not affect check_hostname
1557 ctx.check_hostname = False
1558 ctx.verify_mode = ssl.CERT_NONE
1559 ctx.check_hostname = False
1560 self.assertFalse(ctx.check_hostname)
1561 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1562 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001563 ctx.check_hostname = True
1564 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001565 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1566
1567 ctx.check_hostname = False
1568 ctx.verify_mode = ssl.CERT_OPTIONAL
1569 ctx.check_hostname = False
1570 self.assertFalse(ctx.check_hostname)
1571 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1572 # keep CERT_OPTIONAL
1573 ctx.check_hostname = True
1574 self.assertTrue(ctx.check_hostname)
1575 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001576
1577 # Cannot set CERT_NONE with check_hostname enabled
1578 with self.assertRaises(ValueError):
1579 ctx.verify_mode = ssl.CERT_NONE
1580 ctx.check_hostname = False
1581 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001582 ctx.verify_mode = ssl.CERT_NONE
1583 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001584
Christian Heimes5fe668c2016-09-12 00:01:11 +02001585 def test_context_client_server(self):
1586 # PROTOCOL_TLS_CLIENT has sane defaults
1587 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1588 self.assertTrue(ctx.check_hostname)
1589 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1590
1591 # PROTOCOL_TLS_SERVER has different but also sane defaults
1592 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1593 self.assertFalse(ctx.check_hostname)
1594 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1595
Christian Heimes4df60f12017-09-15 20:26:05 +02001596 def test_context_custom_class(self):
1597 class MySSLSocket(ssl.SSLSocket):
1598 pass
1599
1600 class MySSLObject(ssl.SSLObject):
1601 pass
1602
1603 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1604 ctx.sslsocket_class = MySSLSocket
1605 ctx.sslobject_class = MySSLObject
1606
1607 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1608 self.assertIsInstance(sock, MySSLSocket)
1609 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1610 self.assertIsInstance(obj, MySSLObject)
1611
Antoine Pitrou152efa22010-05-16 18:19:27 +00001612
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001613class SSLErrorTests(unittest.TestCase):
1614
1615 def test_str(self):
1616 # The str() of a SSLError doesn't include the errno
1617 e = ssl.SSLError(1, "foo")
1618 self.assertEqual(str(e), "foo")
1619 self.assertEqual(e.errno, 1)
1620 # Same for a subclass
1621 e = ssl.SSLZeroReturnError(1, "foo")
1622 self.assertEqual(str(e), "foo")
1623 self.assertEqual(e.errno, 1)
1624
1625 def test_lib_reason(self):
1626 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001627 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001628 with self.assertRaises(ssl.SSLError) as cm:
1629 ctx.load_dh_params(CERTFILE)
1630 self.assertEqual(cm.exception.library, 'PEM')
1631 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1632 s = str(cm.exception)
1633 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1634
1635 def test_subclass(self):
1636 # Check that the appropriate SSLError subclass is raised
1637 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001638 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1639 ctx.check_hostname = False
1640 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001641 with socket.socket() as s:
1642 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001643 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001644 c = socket.socket()
1645 c.connect(s.getsockname())
1646 c.setblocking(False)
1647 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001648 with self.assertRaises(ssl.SSLWantReadError) as cm:
1649 c.do_handshake()
1650 s = str(cm.exception)
1651 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1652 # For compatibility
1653 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1654
1655
Christian Heimes61d478c2018-01-27 15:51:38 +01001656 def test_bad_server_hostname(self):
1657 ctx = ssl.create_default_context()
1658 with self.assertRaises(ValueError):
1659 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1660 server_hostname="")
1661 with self.assertRaises(ValueError):
1662 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1663 server_hostname=".example.org")
Miss Islington (bot)2dd885e2018-03-25 04:28:20 -07001664 with self.assertRaises(TypeError):
1665 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1666 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001667
1668
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001669class MemoryBIOTests(unittest.TestCase):
1670
1671 def test_read_write(self):
1672 bio = ssl.MemoryBIO()
1673 bio.write(b'foo')
1674 self.assertEqual(bio.read(), b'foo')
1675 self.assertEqual(bio.read(), b'')
1676 bio.write(b'foo')
1677 bio.write(b'bar')
1678 self.assertEqual(bio.read(), b'foobar')
1679 self.assertEqual(bio.read(), b'')
1680 bio.write(b'baz')
1681 self.assertEqual(bio.read(2), b'ba')
1682 self.assertEqual(bio.read(1), b'z')
1683 self.assertEqual(bio.read(1), b'')
1684
1685 def test_eof(self):
1686 bio = ssl.MemoryBIO()
1687 self.assertFalse(bio.eof)
1688 self.assertEqual(bio.read(), b'')
1689 self.assertFalse(bio.eof)
1690 bio.write(b'foo')
1691 self.assertFalse(bio.eof)
1692 bio.write_eof()
1693 self.assertFalse(bio.eof)
1694 self.assertEqual(bio.read(2), b'fo')
1695 self.assertFalse(bio.eof)
1696 self.assertEqual(bio.read(1), b'o')
1697 self.assertTrue(bio.eof)
1698 self.assertEqual(bio.read(), b'')
1699 self.assertTrue(bio.eof)
1700
1701 def test_pending(self):
1702 bio = ssl.MemoryBIO()
1703 self.assertEqual(bio.pending, 0)
1704 bio.write(b'foo')
1705 self.assertEqual(bio.pending, 3)
1706 for i in range(3):
1707 bio.read(1)
1708 self.assertEqual(bio.pending, 3-i-1)
1709 for i in range(3):
1710 bio.write(b'x')
1711 self.assertEqual(bio.pending, i+1)
1712 bio.read()
1713 self.assertEqual(bio.pending, 0)
1714
1715 def test_buffer_types(self):
1716 bio = ssl.MemoryBIO()
1717 bio.write(b'foo')
1718 self.assertEqual(bio.read(), b'foo')
1719 bio.write(bytearray(b'bar'))
1720 self.assertEqual(bio.read(), b'bar')
1721 bio.write(memoryview(b'baz'))
1722 self.assertEqual(bio.read(), b'baz')
1723
1724 def test_error_types(self):
1725 bio = ssl.MemoryBIO()
1726 self.assertRaises(TypeError, bio.write, 'foo')
1727 self.assertRaises(TypeError, bio.write, None)
1728 self.assertRaises(TypeError, bio.write, True)
1729 self.assertRaises(TypeError, bio.write, 1)
1730
1731
Christian Heimes89c20512018-02-27 11:17:32 +01001732class SSLObjectTests(unittest.TestCase):
1733 def test_private_init(self):
1734 bio = ssl.MemoryBIO()
1735 with self.assertRaisesRegex(TypeError, "public constructor"):
1736 ssl.SSLObject(bio, bio)
1737
Miss Islington (bot)c00f7032018-09-21 22:00:42 -07001738 def test_unwrap(self):
1739 client_ctx, server_ctx, hostname = testing_context()
1740 c_in = ssl.MemoryBIO()
1741 c_out = ssl.MemoryBIO()
1742 s_in = ssl.MemoryBIO()
1743 s_out = ssl.MemoryBIO()
1744 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1745 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1746
1747 # Loop on the handshake for a bit to get it settled
1748 for _ in range(5):
1749 try:
1750 client.do_handshake()
1751 except ssl.SSLWantReadError:
1752 pass
1753 if c_out.pending:
1754 s_in.write(c_out.read())
1755 try:
1756 server.do_handshake()
1757 except ssl.SSLWantReadError:
1758 pass
1759 if s_out.pending:
1760 c_in.write(s_out.read())
1761 # Now the handshakes should be complete (don't raise WantReadError)
1762 client.do_handshake()
1763 server.do_handshake()
1764
1765 # Now if we unwrap one side unilaterally, it should send close-notify
1766 # and raise WantReadError:
1767 with self.assertRaises(ssl.SSLWantReadError):
1768 client.unwrap()
1769
1770 # But server.unwrap() does not raise, because it reads the client's
1771 # close-notify:
1772 s_in.write(c_out.read())
1773 server.unwrap()
1774
1775 # And now that the client gets the server's close-notify, it doesn't
1776 # raise either.
1777 c_in.write(s_out.read())
1778 client.unwrap()
Christian Heimes89c20512018-02-27 11:17:32 +01001779
Martin Panter3840b2a2016-03-27 01:53:46 +00001780class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001781 """Tests that connect to a simple server running in the background"""
1782
1783 def setUp(self):
1784 server = ThreadedEchoServer(SIGNED_CERTFILE)
1785 self.server_addr = (HOST, server.port)
1786 server.__enter__()
1787 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001788
Antoine Pitrou480a1242010-04-28 21:37:09 +00001789 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001790 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001791 cert_reqs=ssl.CERT_NONE) as s:
1792 s.connect(self.server_addr)
1793 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001794 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001795
Martin Panter3840b2a2016-03-27 01:53:46 +00001796 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001797 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001798 cert_reqs=ssl.CERT_REQUIRED,
1799 ca_certs=SIGNING_CA) as s:
1800 s.connect(self.server_addr)
1801 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001802 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001803
Martin Panter3840b2a2016-03-27 01:53:46 +00001804 def test_connect_fail(self):
1805 # This should fail because we have no verification certs. Connection
1806 # failure crashes ThreadedEchoServer, so run this in an independent
1807 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001808 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001809 cert_reqs=ssl.CERT_REQUIRED)
1810 self.addCleanup(s.close)
1811 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1812 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001813
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001814 def test_connect_ex(self):
1815 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001816 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001817 cert_reqs=ssl.CERT_REQUIRED,
1818 ca_certs=SIGNING_CA)
1819 self.addCleanup(s.close)
1820 self.assertEqual(0, s.connect_ex(self.server_addr))
1821 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001822
1823 def test_non_blocking_connect_ex(self):
1824 # Issue #11326: non-blocking connect_ex() should allow handshake
1825 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001826 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001827 cert_reqs=ssl.CERT_REQUIRED,
1828 ca_certs=SIGNING_CA,
1829 do_handshake_on_connect=False)
1830 self.addCleanup(s.close)
1831 s.setblocking(False)
1832 rc = s.connect_ex(self.server_addr)
1833 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1834 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1835 # Wait for connect to finish
1836 select.select([], [s], [], 5.0)
1837 # Non-blocking handshake
1838 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001839 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001840 s.do_handshake()
1841 break
1842 except ssl.SSLWantReadError:
1843 select.select([s], [], [], 5.0)
1844 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001845 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001846 # SSL established
1847 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001848
Antoine Pitrou152efa22010-05-16 18:19:27 +00001849 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001850 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001851 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001852 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1853 s.connect(self.server_addr)
1854 self.assertEqual({}, s.getpeercert())
1855 # Same with a server hostname
1856 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1857 server_hostname="dummy") as s:
1858 s.connect(self.server_addr)
1859 ctx.verify_mode = ssl.CERT_REQUIRED
1860 # This should succeed because we specify the root cert
1861 ctx.load_verify_locations(SIGNING_CA)
1862 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1863 s.connect(self.server_addr)
1864 cert = s.getpeercert()
1865 self.assertTrue(cert)
1866
1867 def test_connect_with_context_fail(self):
1868 # This should fail because we have no verification certs. Connection
1869 # failure crashes ThreadedEchoServer, so run this in an independent
1870 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001871 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 ctx.verify_mode = ssl.CERT_REQUIRED
1873 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1874 self.addCleanup(s.close)
1875 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1876 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001877
1878 def test_connect_capath(self):
1879 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001880 # NOTE: the subject hashing algorithm has been changed between
1881 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1882 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001883 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001884 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001885 ctx.verify_mode = ssl.CERT_REQUIRED
1886 ctx.load_verify_locations(capath=CAPATH)
1887 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1888 s.connect(self.server_addr)
1889 cert = s.getpeercert()
1890 self.assertTrue(cert)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07001891
Martin Panter3840b2a2016-03-27 01:53:46 +00001892 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001893 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001894 ctx.verify_mode = ssl.CERT_REQUIRED
1895 ctx.load_verify_locations(capath=BYTES_CAPATH)
1896 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1897 s.connect(self.server_addr)
1898 cert = s.getpeercert()
1899 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001900
Christian Heimesefff7062013-11-21 03:35:02 +01001901 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001902 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001903 pem = f.read()
1904 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001905 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001906 ctx.verify_mode = ssl.CERT_REQUIRED
1907 ctx.load_verify_locations(cadata=pem)
1908 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1909 s.connect(self.server_addr)
1910 cert = s.getpeercert()
1911 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001912
Martin Panter3840b2a2016-03-27 01:53:46 +00001913 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001914 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001915 ctx.verify_mode = ssl.CERT_REQUIRED
1916 ctx.load_verify_locations(cadata=der)
1917 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1918 s.connect(self.server_addr)
1919 cert = s.getpeercert()
1920 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001921
Antoine Pitroue3220242010-04-24 11:13:53 +00001922 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1923 def test_makefile_close(self):
1924 # Issue #5238: creating a file-like object with makefile() shouldn't
1925 # delay closing the underlying "real socket" (here tested with its
1926 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001927 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001928 ss.connect(self.server_addr)
1929 fd = ss.fileno()
1930 f = ss.makefile()
1931 f.close()
1932 # The fd is still open
1933 os.read(fd, 0)
1934 # Closing the SSL socket should close the fd too
1935 ss.close()
1936 gc.collect()
1937 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001938 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001940
Antoine Pitrou480a1242010-04-28 21:37:09 +00001941 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 s = socket.socket(socket.AF_INET)
1943 s.connect(self.server_addr)
1944 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001945 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 cert_reqs=ssl.CERT_NONE,
1947 do_handshake_on_connect=False)
1948 self.addCleanup(s.close)
1949 count = 0
1950 while True:
1951 try:
1952 count += 1
1953 s.do_handshake()
1954 break
1955 except ssl.SSLWantReadError:
1956 select.select([s], [], [])
1957 except ssl.SSLWantWriteError:
1958 select.select([], [s], [])
1959 if support.verbose:
1960 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001961
Antoine Pitrou480a1242010-04-28 21:37:09 +00001962 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001963 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001964
Martin Panter3840b2a2016-03-27 01:53:46 +00001965 def test_get_server_certificate_fail(self):
1966 # Connection failure crashes ThreadedEchoServer, so run this in an
1967 # independent test method
1968 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001969
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001970 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001971 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001972 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1973 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001974 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001975 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1976 s.connect(self.server_addr)
1977 # Error checking can happen at instantiation or when connecting
1978 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1979 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001980 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001981 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1982 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001983
Christian Heimes9a5395a2013-06-17 15:44:12 +02001984 def test_get_ca_certs_capath(self):
1985 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001986 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001987 ctx.load_verify_locations(capath=CAPATH)
1988 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001989 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1990 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 s.connect(self.server_addr)
1992 cert = s.getpeercert()
1993 self.assertTrue(cert)
1994 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001995
Christian Heimes575596e2013-12-15 21:49:17 +01001996 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001997 def test_context_setget(self):
1998 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001999 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2000 ctx1.load_verify_locations(capath=CAPATH)
2001 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2002 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002003 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002004 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002005 ss.connect(self.server_addr)
2006 self.assertIs(ss.context, ctx1)
2007 self.assertIs(ss._sslobj.context, ctx1)
2008 ss.context = ctx2
2009 self.assertIs(ss.context, ctx2)
2010 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002011
2012 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2013 # A simple IO loop. Call func(*args) depending on the error we get
2014 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2015 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002016 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002017 count = 0
2018 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002019 if time.monotonic() > deadline:
2020 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002021 errno = None
2022 count += 1
2023 try:
2024 ret = func(*args)
2025 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002026 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002027 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002028 raise
2029 errno = e.errno
2030 # Get any data from the outgoing BIO irrespective of any error, and
2031 # send it to the socket.
2032 buf = outgoing.read()
2033 sock.sendall(buf)
2034 # If there's no error, we're done. For WANT_READ, we need to get
2035 # data from the socket and put it in the incoming BIO.
2036 if errno is None:
2037 break
2038 elif errno == ssl.SSL_ERROR_WANT_READ:
2039 buf = sock.recv(32768)
2040 if buf:
2041 incoming.write(buf)
2042 else:
2043 incoming.write_eof()
2044 if support.verbose:
2045 sys.stdout.write("Needed %d calls to complete %s().\n"
2046 % (count, func.__name__))
2047 return ret
2048
Martin Panter3840b2a2016-03-27 01:53:46 +00002049 def test_bio_handshake(self):
2050 sock = socket.socket(socket.AF_INET)
2051 self.addCleanup(sock.close)
2052 sock.connect(self.server_addr)
2053 incoming = ssl.MemoryBIO()
2054 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002055 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2056 self.assertTrue(ctx.check_hostname)
2057 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002058 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002059 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2060 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002061 self.assertIs(sslobj._sslobj.owner, sslobj)
2062 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002063 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002064 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002065 self.assertRaises(ValueError, sslobj.getpeercert)
2066 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2067 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2068 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2069 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002070 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002071 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002072 self.assertTrue(sslobj.getpeercert())
2073 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2074 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2075 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002076 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002077 except ssl.SSLSyscallError:
2078 # If the server shuts down the TCP connection without sending a
2079 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2080 pass
2081 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2082
2083 def test_bio_read_write_data(self):
2084 sock = socket.socket(socket.AF_INET)
2085 self.addCleanup(sock.close)
2086 sock.connect(self.server_addr)
2087 incoming = ssl.MemoryBIO()
2088 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002089 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002090 ctx.verify_mode = ssl.CERT_NONE
2091 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2092 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2093 req = b'FOO\n'
2094 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2095 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2096 self.assertEqual(buf, b'foo\n')
2097 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002098
2099
Martin Panter3840b2a2016-03-27 01:53:46 +00002100class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002101
Martin Panter3840b2a2016-03-27 01:53:46 +00002102 def test_timeout_connect_ex(self):
2103 # Issue #12065: on a timeout, connect_ex() should return the original
2104 # errno (mimicking the behaviour of non-SSL sockets).
2105 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002106 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002107 cert_reqs=ssl.CERT_REQUIRED,
2108 do_handshake_on_connect=False)
2109 self.addCleanup(s.close)
2110 s.settimeout(0.0000001)
2111 rc = s.connect_ex((REMOTE_HOST, 443))
2112 if rc == 0:
2113 self.skipTest("REMOTE_HOST responded too quickly")
2114 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2115
2116 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2117 def test_get_server_certificate_ipv6(self):
2118 with support.transient_internet('ipv6.google.com'):
2119 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2120 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2121
Martin Panter3840b2a2016-03-27 01:53:46 +00002122
2123def _test_get_server_certificate(test, host, port, cert=None):
2124 pem = ssl.get_server_certificate((host, port))
2125 if not pem:
2126 test.fail("No server certificate on %s:%s!" % (host, port))
2127
2128 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2129 if not pem:
2130 test.fail("No server certificate on %s:%s!" % (host, port))
2131 if support.verbose:
2132 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2133
2134def _test_get_server_certificate_fail(test, host, port):
2135 try:
2136 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2137 except ssl.SSLError as x:
2138 #should fail
2139 if support.verbose:
2140 sys.stdout.write("%s\n" % x)
2141 else:
2142 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2143
2144
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002145from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002146
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002147class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002148
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002149 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002150
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002151 """A mildly complicated class, because we want it to work both
2152 with and without the SSL wrapper around the socket connection, so
2153 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002154
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002155 def __init__(self, server, connsock, addr):
2156 self.server = server
2157 self.running = False
2158 self.sock = connsock
2159 self.addr = addr
2160 self.sock.setblocking(1)
2161 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002162 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002163 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002164
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002165 def wrap_conn(self):
2166 try:
2167 self.sslconn = self.server.context.wrap_socket(
2168 self.sock, server_side=True)
2169 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2170 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002171 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002172 # We treat ConnectionResetError as though it were an
2173 # SSLError - OpenSSL on Ubuntu abruptly closes the
2174 # connection when asked to use an unsupported protocol.
2175 #
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002176 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2177 # tries to send session tickets after handshake.
2178 # https://github.com/openssl/openssl/issues/6342
2179 self.server.conn_errors.append(str(e))
2180 if self.server.chatty:
2181 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2182 self.running = False
2183 self.close()
2184 return False
2185 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002186 # OSError may occur with wrong protocols, e.g. both
2187 # sides use PROTOCOL_TLS_SERVER.
2188 #
2189 # XXX Various errors can have happened here, for example
2190 # a mismatching protocol version, an invalid certificate,
2191 # or a low-level bug. This should be made more discriminating.
2192 #
2193 # bpo-31323: Store the exception as string to prevent
2194 # a reference leak: server -> conn_errors -> exception
2195 # -> traceback -> self (ConnectionHandler) -> server
2196 self.server.conn_errors.append(str(e))
2197 if self.server.chatty:
2198 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2199 self.running = False
2200 self.server.stop()
2201 self.close()
2202 return False
2203 else:
2204 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2205 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2206 cert = self.sslconn.getpeercert()
2207 if support.verbose and self.server.chatty:
2208 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2209 cert_binary = self.sslconn.getpeercert(True)
2210 if support.verbose and self.server.chatty:
2211 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2212 cipher = self.sslconn.cipher()
2213 if support.verbose and self.server.chatty:
2214 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2215 sys.stdout.write(" server: selected protocol is now "
2216 + str(self.sslconn.selected_npn_protocol()) + "\n")
2217 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002218
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002219 def read(self):
2220 if self.sslconn:
2221 return self.sslconn.read()
2222 else:
2223 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002224
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002225 def write(self, bytes):
2226 if self.sslconn:
2227 return self.sslconn.write(bytes)
2228 else:
2229 return self.sock.send(bytes)
2230
2231 def close(self):
2232 if self.sslconn:
2233 self.sslconn.close()
2234 else:
2235 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002236
Antoine Pitrou480a1242010-04-28 21:37:09 +00002237 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002238 self.running = True
2239 if not self.server.starttls_server:
2240 if not self.wrap_conn():
2241 return
2242 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002243 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002244 msg = self.read()
2245 stripped = msg.strip()
2246 if not stripped:
2247 # eof, so quit this handler
2248 self.running = False
2249 try:
2250 self.sock = self.sslconn.unwrap()
2251 except OSError:
2252 # Many tests shut the TCP connection down
2253 # without an SSL shutdown. This causes
2254 # unwrap() to raise OSError with errno=0!
2255 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002256 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002257 self.sslconn = None
2258 self.close()
2259 elif stripped == b'over':
2260 if support.verbose and self.server.connectionchatty:
2261 sys.stdout.write(" server: client closed connection\n")
2262 self.close()
2263 return
2264 elif (self.server.starttls_server and
2265 stripped == b'STARTTLS'):
2266 if support.verbose and self.server.connectionchatty:
2267 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2268 self.write(b"OK\n")
2269 if not self.wrap_conn():
2270 return
2271 elif (self.server.starttls_server and self.sslconn
2272 and stripped == b'ENDTLS'):
2273 if support.verbose and self.server.connectionchatty:
2274 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2275 self.write(b"OK\n")
2276 self.sock = self.sslconn.unwrap()
2277 self.sslconn = None
2278 if support.verbose and self.server.connectionchatty:
2279 sys.stdout.write(" server: connection is now unencrypted...\n")
2280 elif stripped == b'CB tls-unique':
2281 if support.verbose and self.server.connectionchatty:
2282 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2283 data = self.sslconn.get_channel_binding("tls-unique")
2284 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes2756ef32018-09-23 09:22:52 +02002285 elif stripped == b'PHA':
2286 if support.verbose and self.server.connectionchatty:
2287 sys.stdout.write(" server: initiating post handshake auth\n")
2288 try:
2289 self.sslconn.verify_client_post_handshake()
2290 except ssl.SSLError as e:
2291 self.write(repr(e).encode("us-ascii") + b"\n")
2292 else:
2293 self.write(b"OK\n")
2294 elif stripped == b'HASCERT':
2295 if self.sslconn.getpeercert() is not None:
2296 self.write(b'TRUE\n')
2297 else:
2298 self.write(b'FALSE\n')
2299 elif stripped == b'GETCERT':
2300 cert = self.sslconn.getpeercert()
2301 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002302 else:
2303 if (support.verbose and
2304 self.server.connectionchatty):
2305 ctype = (self.sslconn and "encrypted") or "unencrypted"
2306 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2307 % (msg, ctype, msg.lower(), ctype))
2308 self.write(msg.lower())
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002309 except ConnectionResetError:
2310 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2311 # when connection is not shut down gracefully.
2312 if self.server.chatty and support.verbose:
2313 sys.stdout.write(
2314 " Connection reset by peer: {}\n".format(
2315 self.addr)
2316 )
2317 self.close()
2318 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002319 except OSError:
2320 if self.server.chatty:
2321 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002322 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002323 self.running = False
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002324
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002325 # normally, we'd just stop here, but for the test
2326 # harness, we want to stop the server
2327 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002328
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002329 def __init__(self, certificate=None, ssl_version=None,
2330 certreqs=None, cacerts=None,
2331 chatty=True, connectionchatty=False, starttls_server=False,
2332 npn_protocols=None, alpn_protocols=None,
2333 ciphers=None, context=None):
2334 if context:
2335 self.context = context
2336 else:
2337 self.context = ssl.SSLContext(ssl_version
2338 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002339 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002340 self.context.verify_mode = (certreqs if certreqs is not None
2341 else ssl.CERT_NONE)
2342 if cacerts:
2343 self.context.load_verify_locations(cacerts)
2344 if certificate:
2345 self.context.load_cert_chain(certificate)
2346 if npn_protocols:
2347 self.context.set_npn_protocols(npn_protocols)
2348 if alpn_protocols:
2349 self.context.set_alpn_protocols(alpn_protocols)
2350 if ciphers:
2351 self.context.set_ciphers(ciphers)
2352 self.chatty = chatty
2353 self.connectionchatty = connectionchatty
2354 self.starttls_server = starttls_server
2355 self.sock = socket.socket()
2356 self.port = support.bind_port(self.sock)
2357 self.flag = None
2358 self.active = False
2359 self.selected_npn_protocols = []
2360 self.selected_alpn_protocols = []
2361 self.shared_ciphers = []
2362 self.conn_errors = []
2363 threading.Thread.__init__(self)
2364 self.daemon = True
2365
2366 def __enter__(self):
2367 self.start(threading.Event())
2368 self.flag.wait()
2369 return self
2370
2371 def __exit__(self, *args):
2372 self.stop()
2373 self.join()
2374
2375 def start(self, flag=None):
2376 self.flag = flag
2377 threading.Thread.start(self)
2378
2379 def run(self):
2380 self.sock.settimeout(0.05)
2381 self.sock.listen()
2382 self.active = True
2383 if self.flag:
2384 # signal an event
2385 self.flag.set()
2386 while self.active:
2387 try:
2388 newconn, connaddr = self.sock.accept()
2389 if support.verbose and self.chatty:
2390 sys.stdout.write(' server: new connection from '
2391 + repr(connaddr) + '\n')
2392 handler = self.ConnectionHandler(self, newconn, connaddr)
2393 handler.start()
2394 handler.join()
2395 except socket.timeout:
2396 pass
2397 except KeyboardInterrupt:
2398 self.stop()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002399 except BaseException as e:
2400 if support.verbose and self.chatty:
2401 sys.stdout.write(
2402 ' connection handling failed: ' + repr(e) + '\n')
2403
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002404 self.sock.close()
2405
2406 def stop(self):
2407 self.active = False
2408
2409class AsyncoreEchoServer(threading.Thread):
2410
2411 # this one's based on asyncore.dispatcher
2412
2413 class EchoServer (asyncore.dispatcher):
2414
2415 class ConnectionHandler(asyncore.dispatcher_with_send):
2416
2417 def __init__(self, conn, certfile):
2418 self.socket = test_wrap_socket(conn, server_side=True,
2419 certfile=certfile,
2420 do_handshake_on_connect=False)
2421 asyncore.dispatcher_with_send.__init__(self, self.socket)
2422 self._ssl_accepting = True
2423 self._do_ssl_handshake()
2424
2425 def readable(self):
2426 if isinstance(self.socket, ssl.SSLSocket):
2427 while self.socket.pending() > 0:
2428 self.handle_read_event()
2429 return True
2430
2431 def _do_ssl_handshake(self):
2432 try:
2433 self.socket.do_handshake()
2434 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2435 return
2436 except ssl.SSLEOFError:
2437 return self.handle_close()
2438 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002439 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002440 except OSError as err:
2441 if err.args[0] == errno.ECONNABORTED:
2442 return self.handle_close()
2443 else:
2444 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002445
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002446 def handle_read(self):
2447 if self._ssl_accepting:
2448 self._do_ssl_handshake()
2449 else:
2450 data = self.recv(1024)
2451 if support.verbose:
2452 sys.stdout.write(" server: read %s from client\n" % repr(data))
2453 if not data:
2454 self.close()
2455 else:
2456 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002457
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002458 def handle_close(self):
2459 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002460 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002461 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002462
2463 def handle_error(self):
2464 raise
2465
Trent Nelson78520002008-04-10 20:54:35 +00002466 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002467 self.certfile = certfile
2468 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2469 self.port = support.bind_port(sock, '')
2470 asyncore.dispatcher.__init__(self, sock)
2471 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002472
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002473 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002474 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2476 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002477
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002478 def handle_error(self):
2479 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002480
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002481 def __init__(self, certfile):
2482 self.flag = None
2483 self.active = False
2484 self.server = self.EchoServer(certfile)
2485 self.port = self.server.port
2486 threading.Thread.__init__(self)
2487 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002488
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002489 def __str__(self):
2490 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002491
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002492 def __enter__(self):
2493 self.start(threading.Event())
2494 self.flag.wait()
2495 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002496
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002497 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002498 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002499 sys.stdout.write(" cleanup: stopping server.\n")
2500 self.stop()
2501 if support.verbose:
2502 sys.stdout.write(" cleanup: joining server thread.\n")
2503 self.join()
2504 if support.verbose:
2505 sys.stdout.write(" cleanup: successfully joined.\n")
2506 # make sure that ConnectionHandler is removed from socket_map
2507 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002508
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002509 def start (self, flag=None):
2510 self.flag = flag
2511 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002512
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002513 def run(self):
2514 self.active = True
2515 if self.flag:
2516 self.flag.set()
2517 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002518 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002519 asyncore.loop(1)
2520 except:
2521 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002522
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002523 def stop(self):
2524 self.active = False
2525 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002526
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527def server_params_test(client_context, server_context, indata=b"FOO\n",
2528 chatty=True, connectionchatty=False, sni_name=None,
2529 session=None):
2530 """
2531 Launch a server, connect a client to it and try various reads
2532 and writes.
2533 """
2534 stats = {}
2535 server = ThreadedEchoServer(context=server_context,
2536 chatty=chatty,
2537 connectionchatty=False)
2538 with server:
2539 with client_context.wrap_socket(socket.socket(),
2540 server_hostname=sni_name, session=session) as s:
2541 s.connect((HOST, server.port))
2542 for arg in [indata, bytearray(indata), memoryview(indata)]:
2543 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002544 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002545 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002546 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002547 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002548 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002549 if connectionchatty:
2550 if support.verbose:
2551 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002552 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002553 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002554 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2555 % (outdata[:20], len(outdata),
2556 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557 s.write(b"over\n")
2558 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002559 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002560 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002561 stats.update({
2562 'compression': s.compression(),
2563 'cipher': s.cipher(),
2564 'peercert': s.getpeercert(),
2565 'client_alpn_protocol': s.selected_alpn_protocol(),
2566 'client_npn_protocol': s.selected_npn_protocol(),
2567 'version': s.version(),
2568 'session_reused': s.session_reused,
2569 'session': s.session,
2570 })
2571 s.close()
2572 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2573 stats['server_npn_protocols'] = server.selected_npn_protocols
2574 stats['server_shared_ciphers'] = server.shared_ciphers
2575 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002576
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002577def try_protocol_combo(server_protocol, client_protocol, expect_success,
2578 certsreqs=None, server_options=0, client_options=0):
2579 """
2580 Try to SSL-connect using *client_protocol* to *server_protocol*.
2581 If *expect_success* is true, assert that the connection succeeds,
2582 if it's false, assert that the connection fails.
2583 Also, if *expect_success* is a string, assert that it is the protocol
2584 version actually used by the connection.
2585 """
2586 if certsreqs is None:
2587 certsreqs = ssl.CERT_NONE
2588 certtype = {
2589 ssl.CERT_NONE: "CERT_NONE",
2590 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2591 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2592 }[certsreqs]
2593 if support.verbose:
2594 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2595 sys.stdout.write(formatstr %
2596 (ssl.get_protocol_name(client_protocol),
2597 ssl.get_protocol_name(server_protocol),
2598 certtype))
2599 client_context = ssl.SSLContext(client_protocol)
2600 client_context.options |= client_options
2601 server_context = ssl.SSLContext(server_protocol)
2602 server_context.options |= server_options
2603
2604 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2605 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2606 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002607 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002608 client_context.set_ciphers("ALL")
2609
2610 for ctx in (client_context, server_context):
2611 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002612 ctx.load_cert_chain(SIGNED_CERTFILE)
2613 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002614 try:
2615 stats = server_params_test(client_context, server_context,
2616 chatty=False, connectionchatty=False)
2617 # Protocol mismatch can result in either an SSLError, or a
2618 # "Connection reset by peer" error.
2619 except ssl.SSLError:
2620 if expect_success:
2621 raise
2622 except OSError as e:
2623 if expect_success or e.errno != errno.ECONNRESET:
2624 raise
2625 else:
2626 if not expect_success:
2627 raise AssertionError(
2628 "Client protocol %s succeeded with server protocol %s!"
2629 % (ssl.get_protocol_name(client_protocol),
2630 ssl.get_protocol_name(server_protocol)))
2631 elif (expect_success is not True
2632 and expect_success != stats['version']):
2633 raise AssertionError("version mismatch: expected %r, got %r"
2634 % (expect_success, stats['version']))
2635
2636
2637class ThreadedTests(unittest.TestCase):
2638
2639 @skip_if_broken_ubuntu_ssl
2640 def test_echo(self):
2641 """Basic test of an SSL client connecting to a server"""
2642 if support.verbose:
2643 sys.stdout.write("\n")
2644 for protocol in PROTOCOLS:
2645 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2646 continue
2647 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2648 context = ssl.SSLContext(protocol)
2649 context.load_cert_chain(CERTFILE)
2650 server_params_test(context, context,
2651 chatty=True, connectionchatty=True)
2652
Christian Heimesa170fa12017-09-15 20:27:30 +02002653 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002654
2655 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2656 server_params_test(client_context=client_context,
2657 server_context=server_context,
2658 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002659 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002660
2661 client_context.check_hostname = False
2662 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2663 with self.assertRaises(ssl.SSLError) as e:
2664 server_params_test(client_context=server_context,
2665 server_context=client_context,
2666 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002667 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002668 self.assertIn('called a function you should not call',
2669 str(e.exception))
2670
2671 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2672 with self.assertRaises(ssl.SSLError) as e:
2673 server_params_test(client_context=server_context,
2674 server_context=server_context,
2675 chatty=True, connectionchatty=True)
2676 self.assertIn('called a function you should not call',
2677 str(e.exception))
2678
2679 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2680 with self.assertRaises(ssl.SSLError) as e:
2681 server_params_test(client_context=server_context,
2682 server_context=client_context,
2683 chatty=True, connectionchatty=True)
2684 self.assertIn('called a function you should not call',
2685 str(e.exception))
2686
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687 def test_getpeercert(self):
2688 if support.verbose:
2689 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002690
2691 client_context, server_context, hostname = testing_context()
2692 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002693 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002694 with client_context.wrap_socket(socket.socket(),
2695 do_handshake_on_connect=False,
2696 server_hostname=hostname) as s:
2697 s.connect((HOST, server.port))
2698 # getpeercert() raise ValueError while the handshake isn't
2699 # done.
2700 with self.assertRaises(ValueError):
2701 s.getpeercert()
2702 s.do_handshake()
2703 cert = s.getpeercert()
2704 self.assertTrue(cert, "Can't get peer certificate.")
2705 cipher = s.cipher()
2706 if support.verbose:
2707 sys.stdout.write(pprint.pformat(cert) + '\n')
2708 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2709 if 'subject' not in cert:
2710 self.fail("No subject field in certificate: %s." %
2711 pprint.pformat(cert))
2712 if ((('organizationName', 'Python Software Foundation'),)
2713 not in cert['subject']):
2714 self.fail(
2715 "Missing or invalid 'organizationName' field in certificate subject; "
2716 "should be 'Python Software Foundation'.")
2717 self.assertIn('notBefore', cert)
2718 self.assertIn('notAfter', cert)
2719 before = ssl.cert_time_to_seconds(cert['notBefore'])
2720 after = ssl.cert_time_to_seconds(cert['notAfter'])
2721 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002722
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002723 @unittest.skipUnless(have_verify_flags(),
2724 "verify_flags need OpenSSL > 0.9.8")
2725 def test_crl_check(self):
2726 if support.verbose:
2727 sys.stdout.write("\n")
2728
Christian Heimesa170fa12017-09-15 20:27:30 +02002729 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002730
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002731 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002732 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002733
2734 # VERIFY_DEFAULT should pass
2735 server = ThreadedEchoServer(context=server_context, chatty=True)
2736 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002737 with client_context.wrap_socket(socket.socket(),
2738 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002739 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002740 cert = s.getpeercert()
2741 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002742
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002743 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002744 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002745
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002746 server = ThreadedEchoServer(context=server_context, chatty=True)
2747 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002748 with client_context.wrap_socket(socket.socket(),
2749 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002750 with self.assertRaisesRegex(ssl.SSLError,
2751 "certificate verify failed"):
2752 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002753
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002754 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002755 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002756
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002757 server = ThreadedEchoServer(context=server_context, chatty=True)
2758 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002759 with client_context.wrap_socket(socket.socket(),
2760 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002761 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002762 cert = s.getpeercert()
2763 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002764
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002765 def test_check_hostname(self):
2766 if support.verbose:
2767 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002768
Christian Heimesa170fa12017-09-15 20:27:30 +02002769 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002770
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002771 # correct hostname should verify
2772 server = ThreadedEchoServer(context=server_context, chatty=True)
2773 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002774 with client_context.wrap_socket(socket.socket(),
2775 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002776 s.connect((HOST, server.port))
2777 cert = s.getpeercert()
2778 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002779
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780 # incorrect hostname should raise an exception
2781 server = ThreadedEchoServer(context=server_context, chatty=True)
2782 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002783 with client_context.wrap_socket(socket.socket(),
2784 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002785 with self.assertRaisesRegex(
2786 ssl.CertificateError,
2787 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002788 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002789
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002790 # missing server_hostname arg should cause an exception, too
2791 server = ThreadedEchoServer(context=server_context, chatty=True)
2792 with server:
2793 with socket.socket() as s:
2794 with self.assertRaisesRegex(ValueError,
2795 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002796 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002797
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002798 def test_ecc_cert(self):
2799 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2800 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07002801 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002802 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2803
2804 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2805 # load ECC cert
2806 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2807
2808 # correct hostname should verify
2809 server = ThreadedEchoServer(context=server_context, chatty=True)
2810 with server:
2811 with client_context.wrap_socket(socket.socket(),
2812 server_hostname=hostname) as s:
2813 s.connect((HOST, server.port))
2814 cert = s.getpeercert()
2815 self.assertTrue(cert, "Can't get peer certificate.")
2816 cipher = s.cipher()[0].split('-')
2817 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2818
2819 def test_dual_rsa_ecc(self):
2820 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2821 client_context.load_verify_locations(SIGNING_CA)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002822 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2823 # algorithms.
2824 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002825 # only ECDSA certs
2826 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2827 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2828
2829 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2830 # load ECC and RSA key/cert pairs
2831 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2832 server_context.load_cert_chain(SIGNED_CERTFILE)
2833
2834 # correct hostname should verify
2835 server = ThreadedEchoServer(context=server_context, chatty=True)
2836 with server:
2837 with client_context.wrap_socket(socket.socket(),
2838 server_hostname=hostname) as s:
2839 s.connect((HOST, server.port))
2840 cert = s.getpeercert()
2841 self.assertTrue(cert, "Can't get peer certificate.")
2842 cipher = s.cipher()[0].split('-')
2843 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2844
Christian Heimes66e57422018-01-29 14:25:13 +01002845 def test_check_hostname_idn(self):
2846 if support.verbose:
2847 sys.stdout.write("\n")
2848
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002849 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002850 server_context.load_cert_chain(IDNSANSFILE)
2851
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002852 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002853 context.verify_mode = ssl.CERT_REQUIRED
2854 context.check_hostname = True
2855 context.load_verify_locations(SIGNING_CA)
2856
2857 # correct hostname should verify, when specified in several
2858 # different ways
2859 idn_hostnames = [
2860 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002861 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002862 ('xn--knig-5qa.idn.pythontest.net',
2863 'xn--knig-5qa.idn.pythontest.net'),
2864 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002865 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002866
2867 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002868 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002869 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2870 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2871 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002872 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2873
2874 # ('königsgäßchen.idna2008.pythontest.net',
2875 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2876 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2877 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2878 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2879 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2880
Christian Heimes66e57422018-01-29 14:25:13 +01002881 ]
2882 for server_hostname, expected_hostname in idn_hostnames:
2883 server = ThreadedEchoServer(context=server_context, chatty=True)
2884 with server:
2885 with context.wrap_socket(socket.socket(),
2886 server_hostname=server_hostname) as s:
2887 self.assertEqual(s.server_hostname, expected_hostname)
2888 s.connect((HOST, server.port))
2889 cert = s.getpeercert()
2890 self.assertEqual(s.server_hostname, expected_hostname)
2891 self.assertTrue(cert, "Can't get peer certificate.")
2892
Christian Heimes66e57422018-01-29 14:25:13 +01002893 # incorrect hostname should raise an exception
2894 server = ThreadedEchoServer(context=server_context, chatty=True)
2895 with server:
2896 with context.wrap_socket(socket.socket(),
2897 server_hostname="python.example.org") as s:
2898 with self.assertRaises(ssl.CertificateError):
2899 s.connect((HOST, server.port))
2900
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002901 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002902 """Connecting when the server rejects the client's certificate
2903
2904 Launch a server with CERT_REQUIRED, and check that trying to
2905 connect to it with a wrong client certificate fails.
2906 """
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002907 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002908 # load client cert that is not signed by trusted CA
2909 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002910 # require TLS client authentication
2911 server_context.verify_mode = ssl.CERT_REQUIRED
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002912 # TLS 1.3 has different handshake
2913 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002914
2915 server = ThreadedEchoServer(
2916 context=server_context, chatty=True, connectionchatty=True,
2917 )
2918
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002919 with server, \
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08002920 client_context.wrap_socket(socket.socket(),
2921 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002922 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002923 # Expect either an SSL error about the server rejecting
2924 # the connection, or a low-level connection reset (which
2925 # sometimes happens on Windows)
2926 s.connect((HOST, server.port))
2927 except ssl.SSLError as e:
2928 if support.verbose:
2929 sys.stdout.write("\nSSLError is %r\n" % e)
2930 except OSError as e:
2931 if e.errno != errno.ECONNRESET:
2932 raise
2933 if support.verbose:
2934 sys.stdout.write("\nsocket.error is %r\n" % e)
2935 else:
2936 self.fail("Use of invalid cert should have failed!")
2937
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002938 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2939 def test_wrong_cert_tls13(self):
2940 client_context, server_context, hostname = testing_context()
Miss Islington (bot)e3228a32018-08-14 10:52:27 -04002941 # load client cert that is not signed by trusted CA
2942 client_context.load_cert_chain(CERTFILE)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07002943 server_context.verify_mode = ssl.CERT_REQUIRED
2944 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2945 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2946
2947 server = ThreadedEchoServer(
2948 context=server_context, chatty=True, connectionchatty=True,
2949 )
2950 with server, \
2951 client_context.wrap_socket(socket.socket(),
2952 server_hostname=hostname) as s:
2953 # TLS 1.3 perform client cert exchange after handshake
2954 s.connect((HOST, server.port))
2955 try:
2956 s.write(b'data')
2957 s.read(4)
2958 except ssl.SSLError as e:
2959 if support.verbose:
2960 sys.stdout.write("\nSSLError is %r\n" % e)
2961 except OSError as e:
2962 if e.errno != errno.ECONNRESET:
2963 raise
2964 if support.verbose:
2965 sys.stdout.write("\nsocket.error is %r\n" % e)
2966 else:
2967 self.fail("Use of invalid cert should have failed!")
2968
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002969 def test_rude_shutdown(self):
2970 """A brutal shutdown of an SSL server should raise an OSError
2971 in the client when attempting handshake.
2972 """
2973 listener_ready = threading.Event()
2974 listener_gone = threading.Event()
2975
2976 s = socket.socket()
2977 port = support.bind_port(s, HOST)
2978
2979 # `listener` runs in a thread. It sits in an accept() until
2980 # the main thread connects. Then it rudely closes the socket,
2981 # and sets Event `listener_gone` to let the main thread know
2982 # the socket is gone.
2983 def listener():
2984 s.listen()
2985 listener_ready.set()
2986 newsock, addr = s.accept()
2987 newsock.close()
2988 s.close()
2989 listener_gone.set()
2990
2991 def connector():
2992 listener_ready.wait()
2993 with socket.socket() as c:
2994 c.connect((HOST, port))
2995 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002996 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002997 ssl_sock = test_wrap_socket(c)
2998 except OSError:
2999 pass
3000 else:
3001 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003002
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003003 t = threading.Thread(target=listener)
3004 t.start()
3005 try:
3006 connector()
3007 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003008 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003009
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003010 def test_ssl_cert_verify_error(self):
3011 if support.verbose:
3012 sys.stdout.write("\n")
3013
3014 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3015 server_context.load_cert_chain(SIGNED_CERTFILE)
3016
3017 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3018
3019 server = ThreadedEchoServer(context=server_context, chatty=True)
3020 with server:
3021 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003022 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003023 try:
3024 s.connect((HOST, server.port))
3025 except ssl.SSLError as e:
3026 msg = 'unable to get local issuer certificate'
3027 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3028 self.assertEqual(e.verify_code, 20)
3029 self.assertEqual(e.verify_message, msg)
3030 self.assertIn(msg, repr(e))
3031 self.assertIn('certificate verify failed', repr(e))
3032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003033 @skip_if_broken_ubuntu_ssl
3034 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3035 "OpenSSL is compiled without SSLv2 support")
3036 def test_protocol_sslv2(self):
3037 """Connecting to an SSLv2 server with various client options"""
3038 if support.verbose:
3039 sys.stdout.write("\n")
3040 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3041 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3042 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003043 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003044 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3045 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3046 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3047 # SSLv23 client with specific SSL options
3048 if no_sslv2_implies_sslv3_hello():
3049 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003050 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003051 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003052 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003053 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003054 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003055 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003056
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003057 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02003058 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003059 """Connecting to an SSLv23 server with various client options"""
3060 if support.verbose:
3061 sys.stdout.write("\n")
3062 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003063 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003064 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003065 except OSError as x:
3066 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3067 if support.verbose:
3068 sys.stdout.write(
3069 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3070 % str(x))
3071 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003072 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3073 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3074 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003075
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003076 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003077 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3078 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3079 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003080
3081 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003082 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3083 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3084 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085
3086 # Server with specific SSL options
3087 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003088 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 server_options=ssl.OP_NO_SSLv3)
3090 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003091 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003092 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003093 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003094 server_options=ssl.OP_NO_TLSv1)
3095
3096
3097 @skip_if_broken_ubuntu_ssl
3098 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3099 "OpenSSL is compiled without SSLv3 support")
3100 def test_protocol_sslv3(self):
3101 """Connecting to an SSLv3 server with various client options"""
3102 if support.verbose:
3103 sys.stdout.write("\n")
3104 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3105 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3106 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3107 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3108 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003109 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003110 client_options=ssl.OP_NO_SSLv3)
3111 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3112 if no_sslv2_implies_sslv3_hello():
3113 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003114 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003115 False, client_options=ssl.OP_NO_SSLv2)
3116
3117 @skip_if_broken_ubuntu_ssl
3118 def test_protocol_tlsv1(self):
3119 """Connecting to a TLSv1 server with various client options"""
3120 if support.verbose:
3121 sys.stdout.write("\n")
3122 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3123 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3124 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3125 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3126 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3127 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3128 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003129 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003130 client_options=ssl.OP_NO_TLSv1)
3131
3132 @skip_if_broken_ubuntu_ssl
3133 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3134 "TLS version 1.1 not supported.")
3135 def test_protocol_tlsv1_1(self):
3136 """Connecting to a TLSv1.1 server with various client options.
3137 Testing against older TLS versions."""
3138 if support.verbose:
3139 sys.stdout.write("\n")
3140 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3141 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3142 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3143 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3144 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003145 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003146 client_options=ssl.OP_NO_TLSv1_1)
3147
Christian Heimesa170fa12017-09-15 20:27:30 +02003148 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003149 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3150 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3151
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003152 @skip_if_broken_ubuntu_ssl
3153 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3154 "TLS version 1.2 not supported.")
3155 def test_protocol_tlsv1_2(self):
3156 """Connecting to a TLSv1.2 server with various client options.
3157 Testing against older TLS versions."""
3158 if support.verbose:
3159 sys.stdout.write("\n")
3160 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3161 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3162 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3163 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3164 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3165 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3166 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003167 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003168 client_options=ssl.OP_NO_TLSv1_2)
3169
Christian Heimesa170fa12017-09-15 20:27:30 +02003170 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003171 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3172 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3173 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3174 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3175
3176 def test_starttls(self):
3177 """Switching from clear text to encrypted and back again."""
3178 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3179
3180 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003181 starttls_server=True,
3182 chatty=True,
3183 connectionchatty=True)
3184 wrapped = False
3185 with server:
3186 s = socket.socket()
3187 s.setblocking(1)
3188 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003189 if support.verbose:
3190 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003191 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003192 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003193 sys.stdout.write(
3194 " client: sending %r...\n" % indata)
3195 if wrapped:
3196 conn.write(indata)
3197 outdata = conn.read()
3198 else:
3199 s.send(indata)
3200 outdata = s.recv(1024)
3201 msg = outdata.strip().lower()
3202 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3203 # STARTTLS ok, switch to secure mode
3204 if support.verbose:
3205 sys.stdout.write(
3206 " client: read %r from server, starting TLS...\n"
3207 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003208 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 wrapped = True
3210 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3211 # ENDTLS ok, switch back to clear text
3212 if support.verbose:
3213 sys.stdout.write(
3214 " client: read %r from server, ending TLS...\n"
3215 % msg)
3216 s = conn.unwrap()
3217 wrapped = False
3218 else:
3219 if support.verbose:
3220 sys.stdout.write(
3221 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003222 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003223 sys.stdout.write(" client: closing connection.\n")
3224 if wrapped:
3225 conn.write(b"over\n")
3226 else:
3227 s.send(b"over\n")
3228 if wrapped:
3229 conn.close()
3230 else:
3231 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003232
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003233 def test_socketserver(self):
3234 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003235 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003236 # try to connect
3237 if support.verbose:
3238 sys.stdout.write('\n')
3239 with open(CERTFILE, 'rb') as f:
3240 d1 = f.read()
3241 d2 = ''
3242 # now fetch the same data from the HTTPS server
3243 url = 'https://localhost:%d/%s' % (
3244 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003245 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003246 f = urllib.request.urlopen(url, context=context)
3247 try:
3248 dlen = f.info().get("content-length")
3249 if dlen and (int(dlen) > 0):
3250 d2 = f.read(int(dlen))
3251 if support.verbose:
3252 sys.stdout.write(
3253 " client: read %d bytes from remote server '%s'\n"
3254 % (len(d2), server))
3255 finally:
3256 f.close()
3257 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003258
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003259 def test_asyncore_server(self):
3260 """Check the example asyncore integration."""
3261 if support.verbose:
3262 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003263
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003264 indata = b"FOO\n"
3265 server = AsyncoreEchoServer(CERTFILE)
3266 with server:
3267 s = test_wrap_socket(socket.socket())
3268 s.connect(('127.0.0.1', server.port))
3269 if support.verbose:
3270 sys.stdout.write(
3271 " client: sending %r...\n" % indata)
3272 s.write(indata)
3273 outdata = s.read()
3274 if support.verbose:
3275 sys.stdout.write(" client: read %r\n" % outdata)
3276 if outdata != indata.lower():
3277 self.fail(
3278 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3279 % (outdata[:20], len(outdata),
3280 indata[:20].lower(), len(indata)))
3281 s.write(b"over\n")
3282 if support.verbose:
3283 sys.stdout.write(" client: closing connection.\n")
3284 s.close()
3285 if support.verbose:
3286 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003287
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003288 def test_recv_send(self):
3289 """Test recv(), send() and friends."""
3290 if support.verbose:
3291 sys.stdout.write("\n")
3292
3293 server = ThreadedEchoServer(CERTFILE,
3294 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003295 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003296 cacerts=CERTFILE,
3297 chatty=True,
3298 connectionchatty=False)
3299 with server:
3300 s = test_wrap_socket(socket.socket(),
3301 server_side=False,
3302 certfile=CERTFILE,
3303 ca_certs=CERTFILE,
3304 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003305 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003306 s.connect((HOST, server.port))
3307 # helper methods for standardising recv* method signatures
3308 def _recv_into():
3309 b = bytearray(b"\0"*100)
3310 count = s.recv_into(b)
3311 return b[:count]
3312
3313 def _recvfrom_into():
3314 b = bytearray(b"\0"*100)
3315 count, addr = s.recvfrom_into(b)
3316 return b[:count]
3317
3318 # (name, method, expect success?, *args, return value func)
3319 send_methods = [
3320 ('send', s.send, True, [], len),
3321 ('sendto', s.sendto, False, ["some.address"], len),
3322 ('sendall', s.sendall, True, [], lambda x: None),
3323 ]
3324 # (name, method, whether to expect success, *args)
3325 recv_methods = [
3326 ('recv', s.recv, True, []),
3327 ('recvfrom', s.recvfrom, False, ["some.address"]),
3328 ('recv_into', _recv_into, True, []),
3329 ('recvfrom_into', _recvfrom_into, False, []),
3330 ]
3331 data_prefix = "PREFIX_"
3332
3333 for (meth_name, send_meth, expect_success, args,
3334 ret_val_meth) in send_methods:
3335 indata = (data_prefix + meth_name).encode('ascii')
3336 try:
3337 ret = send_meth(indata, *args)
3338 msg = "sending with {}".format(meth_name)
3339 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3340 outdata = s.read()
3341 if outdata != indata.lower():
3342 self.fail(
3343 "While sending with <<{name:s}>> bad data "
3344 "<<{outdata:r}>> ({nout:d}) received; "
3345 "expected <<{indata:r}>> ({nin:d})\n".format(
3346 name=meth_name, outdata=outdata[:20],
3347 nout=len(outdata),
3348 indata=indata[:20], nin=len(indata)
3349 )
3350 )
3351 except ValueError as e:
3352 if expect_success:
3353 self.fail(
3354 "Failed to send with method <<{name:s}>>; "
3355 "expected to succeed.\n".format(name=meth_name)
3356 )
3357 if not str(e).startswith(meth_name):
3358 self.fail(
3359 "Method <<{name:s}>> failed with unexpected "
3360 "exception message: {exp:s}\n".format(
3361 name=meth_name, exp=e
3362 )
3363 )
3364
3365 for meth_name, recv_meth, expect_success, args in recv_methods:
3366 indata = (data_prefix + meth_name).encode('ascii')
3367 try:
3368 s.send(indata)
3369 outdata = recv_meth(*args)
3370 if outdata != indata.lower():
3371 self.fail(
3372 "While receiving with <<{name:s}>> bad data "
3373 "<<{outdata:r}>> ({nout:d}) received; "
3374 "expected <<{indata:r}>> ({nin:d})\n".format(
3375 name=meth_name, outdata=outdata[:20],
3376 nout=len(outdata),
3377 indata=indata[:20], nin=len(indata)
3378 )
3379 )
3380 except ValueError as e:
3381 if expect_success:
3382 self.fail(
3383 "Failed to receive with method <<{name:s}>>; "
3384 "expected to succeed.\n".format(name=meth_name)
3385 )
3386 if not str(e).startswith(meth_name):
3387 self.fail(
3388 "Method <<{name:s}>> failed with unexpected "
3389 "exception message: {exp:s}\n".format(
3390 name=meth_name, exp=e
3391 )
3392 )
3393 # consume data
3394 s.read()
3395
3396 # read(-1, buffer) is supported, even though read(-1) is not
3397 data = b"data"
3398 s.send(data)
3399 buffer = bytearray(len(data))
3400 self.assertEqual(s.read(-1, buffer), len(data))
3401 self.assertEqual(buffer, data)
3402
Christian Heimes888bbdc2017-09-07 14:18:21 -07003403 # sendall accepts bytes-like objects
3404 if ctypes is not None:
3405 ubyte = ctypes.c_ubyte * len(data)
3406 byteslike = ubyte.from_buffer_copy(data)
3407 s.sendall(byteslike)
3408 self.assertEqual(s.read(), data)
3409
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003410 # Make sure sendmsg et al are disallowed to avoid
3411 # inadvertent disclosure of data and/or corruption
3412 # of the encrypted data stream
3413 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3414 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3415 self.assertRaises(NotImplementedError,
3416 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003417 s.write(b"over\n")
3418
3419 self.assertRaises(ValueError, s.recv, -1)
3420 self.assertRaises(ValueError, s.read, -1)
3421
3422 s.close()
3423
3424 def test_recv_zero(self):
3425 server = ThreadedEchoServer(CERTFILE)
3426 server.__enter__()
3427 self.addCleanup(server.__exit__, None, None)
3428 s = socket.create_connection((HOST, server.port))
3429 self.addCleanup(s.close)
3430 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3431 self.addCleanup(s.close)
3432
3433 # recv/read(0) should return no data
3434 s.send(b"data")
3435 self.assertEqual(s.recv(0), b"")
3436 self.assertEqual(s.read(0), b"")
3437 self.assertEqual(s.read(), b"data")
3438
3439 # Should not block if the other end sends no data
3440 s.setblocking(False)
3441 self.assertEqual(s.recv(0), b"")
3442 self.assertEqual(s.recv_into(bytearray()), 0)
3443
3444 def test_nonblocking_send(self):
3445 server = ThreadedEchoServer(CERTFILE,
3446 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003447 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003448 cacerts=CERTFILE,
3449 chatty=True,
3450 connectionchatty=False)
3451 with server:
3452 s = test_wrap_socket(socket.socket(),
3453 server_side=False,
3454 certfile=CERTFILE,
3455 ca_certs=CERTFILE,
3456 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003457 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003458 s.connect((HOST, server.port))
3459 s.setblocking(False)
3460
3461 # If we keep sending data, at some point the buffers
3462 # will be full and the call will block
3463 buf = bytearray(8192)
3464 def fill_buffer():
3465 while True:
3466 s.send(buf)
3467 self.assertRaises((ssl.SSLWantWriteError,
3468 ssl.SSLWantReadError), fill_buffer)
3469
3470 # Now read all the output and discard it
3471 s.setblocking(True)
3472 s.close()
3473
3474 def test_handshake_timeout(self):
3475 # Issue #5103: SSL handshake must respect the socket timeout
3476 server = socket.socket(socket.AF_INET)
3477 host = "127.0.0.1"
3478 port = support.bind_port(server)
3479 started = threading.Event()
3480 finish = False
3481
3482 def serve():
3483 server.listen()
3484 started.set()
3485 conns = []
3486 while not finish:
3487 r, w, e = select.select([server], [], [], 0.1)
3488 if server in r:
3489 # Let the socket hang around rather than having
3490 # it closed by garbage collection.
3491 conns.append(server.accept()[0])
3492 for sock in conns:
3493 sock.close()
3494
3495 t = threading.Thread(target=serve)
3496 t.start()
3497 started.wait()
3498
3499 try:
3500 try:
3501 c = socket.socket(socket.AF_INET)
3502 c.settimeout(0.2)
3503 c.connect((host, port))
3504 # Will attempt handshake and time out
3505 self.assertRaisesRegex(socket.timeout, "timed out",
3506 test_wrap_socket, c)
3507 finally:
3508 c.close()
3509 try:
3510 c = socket.socket(socket.AF_INET)
3511 c = test_wrap_socket(c)
3512 c.settimeout(0.2)
3513 # Will attempt handshake and time out
3514 self.assertRaisesRegex(socket.timeout, "timed out",
3515 c.connect, (host, port))
3516 finally:
3517 c.close()
3518 finally:
3519 finish = True
3520 t.join()
3521 server.close()
3522
3523 def test_server_accept(self):
3524 # Issue #16357: accept() on a SSLSocket created through
3525 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003526 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003527 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003528 context.load_verify_locations(SIGNING_CA)
3529 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003530 server = socket.socket(socket.AF_INET)
3531 host = "127.0.0.1"
3532 port = support.bind_port(server)
3533 server = context.wrap_socket(server, server_side=True)
3534 self.assertTrue(server.server_side)
3535
3536 evt = threading.Event()
3537 remote = None
3538 peer = None
3539 def serve():
3540 nonlocal remote, peer
3541 server.listen()
3542 # Block on the accept and wait on the connection to close.
3543 evt.set()
3544 remote, peer = server.accept()
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003545 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003546
3547 t = threading.Thread(target=serve)
3548 t.start()
3549 # Client wait until server setup and perform a connect.
3550 evt.wait()
3551 client = context.wrap_socket(socket.socket())
3552 client.connect((host, port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003553 client.send(b'data')
3554 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003555 client_addr = client.getsockname()
3556 client.close()
3557 t.join()
3558 remote.close()
3559 server.close()
3560 # Sanity checks.
3561 self.assertIsInstance(remote, ssl.SSLSocket)
3562 self.assertEqual(peer, client_addr)
3563
3564 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003565 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003566 with context.wrap_socket(socket.socket()) as sock:
3567 with self.assertRaises(OSError) as cm:
3568 sock.getpeercert()
3569 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3570
3571 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003572 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003573 with context.wrap_socket(socket.socket()) as sock:
3574 with self.assertRaises(OSError) as cm:
3575 sock.do_handshake()
3576 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3577
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003578 def test_no_shared_ciphers(self):
3579 client_context, server_context, hostname = testing_context()
3580 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3581 client_context.options |= ssl.OP_NO_TLSv1_3
3582 # Force different suites on client and master
3583 client_context.set_ciphers("AES128")
3584 server_context.set_ciphers("AES256")
3585 with ThreadedEchoServer(context=server_context) as server:
3586 with client_context.wrap_socket(socket.socket(),
3587 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003588 with self.assertRaises(OSError):
3589 s.connect((HOST, server.port))
3590 self.assertIn("no shared cipher", server.conn_errors[0])
3591
3592 def test_version_basic(self):
3593 """
3594 Basic tests for SSLSocket.version().
3595 More tests are done in the test_protocol_*() methods.
3596 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003597 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3598 context.check_hostname = False
3599 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003600 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003601 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003602 chatty=False) as server:
3603 with context.wrap_socket(socket.socket()) as s:
3604 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003605 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003606 s.connect((HOST, server.port))
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003607 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003608 self.assertEqual(s.version(), 'TLSv1.3')
3609 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003610 self.assertEqual(s.version(), 'TLSv1.2')
3611 else: # 0.9.8 to 1.0.1
3612 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003613 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003614 self.assertIs(s.version(), None)
3615
Christian Heimescb5b68a2017-09-07 18:07:00 -07003616 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3617 "test requires TLSv1.3 enabled OpenSSL")
3618 def test_tls1_3(self):
3619 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3620 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003621 context.options |= (
3622 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3623 )
3624 with ThreadedEchoServer(context=context) as server:
3625 with context.wrap_socket(socket.socket()) as s:
3626 s.connect((HOST, server.port))
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003627 self.assertIn(s.cipher()[0], {
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07003628 'TLS_AES_256_GCM_SHA384',
3629 'TLS_CHACHA20_POLY1305_SHA256',
3630 'TLS_AES_128_GCM_SHA256',
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003631 })
3632 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003633
Miss Islington (bot)4c842b02018-02-27 03:41:04 -08003634 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3635 "required OpenSSL 1.1.0g")
3636 def test_min_max_version(self):
3637 client_context, server_context, hostname = testing_context()
3638 # client TLSv1.0 to 1.2
3639 client_context.minimum_version = ssl.TLSVersion.TLSv1
3640 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3641 # server only TLSv1.2
3642 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3643 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3644
3645 with ThreadedEchoServer(context=server_context) as server:
3646 with client_context.wrap_socket(socket.socket(),
3647 server_hostname=hostname) as s:
3648 s.connect((HOST, server.port))
3649 self.assertEqual(s.version(), 'TLSv1.2')
3650
3651 # client 1.0 to 1.2, server 1.0 to 1.1
3652 server_context.minimum_version = ssl.TLSVersion.TLSv1
3653 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3654
3655 with ThreadedEchoServer(context=server_context) as server:
3656 with client_context.wrap_socket(socket.socket(),
3657 server_hostname=hostname) as s:
3658 s.connect((HOST, server.port))
3659 self.assertEqual(s.version(), 'TLSv1.1')
3660
3661 # client 1.0, server 1.2 (mismatch)
3662 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3663 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3664 client_context.minimum_version = ssl.TLSVersion.TLSv1
3665 client_context.maximum_version = ssl.TLSVersion.TLSv1
3666 with ThreadedEchoServer(context=server_context) as server:
3667 with client_context.wrap_socket(socket.socket(),
3668 server_hostname=hostname) as s:
3669 with self.assertRaises(ssl.SSLError) as e:
3670 s.connect((HOST, server.port))
3671 self.assertIn("alert", str(e.exception))
3672
3673
3674 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3675 "required OpenSSL 1.1.0g")
3676 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3677 def test_min_max_version_sslv3(self):
3678 client_context, server_context, hostname = testing_context()
3679 server_context.minimum_version = ssl.TLSVersion.SSLv3
3680 client_context.minimum_version = ssl.TLSVersion.SSLv3
3681 client_context.maximum_version = ssl.TLSVersion.SSLv3
3682 with ThreadedEchoServer(context=server_context) as server:
3683 with client_context.wrap_socket(socket.socket(),
3684 server_hostname=hostname) as s:
3685 s.connect((HOST, server.port))
3686 self.assertEqual(s.version(), 'SSLv3')
3687
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003688 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3689 def test_default_ecdh_curve(self):
3690 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3691 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003692 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003693 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003694 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3695 # cipher name.
3696 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003697 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3698 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3699 # our default cipher list should prefer ECDH-based ciphers
3700 # automatically.
3701 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3702 context.set_ciphers("ECCdraft:ECDH")
3703 with ThreadedEchoServer(context=context) as server:
3704 with context.wrap_socket(socket.socket()) as s:
3705 s.connect((HOST, server.port))
3706 self.assertIn("ECDH", s.cipher()[0])
3707
3708 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3709 "'tls-unique' channel binding not available")
3710 def test_tls_unique_channel_binding(self):
3711 """Test tls-unique channel binding."""
3712 if support.verbose:
3713 sys.stdout.write("\n")
3714
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003715 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003716
3717 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003718 chatty=True,
3719 connectionchatty=False)
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003720
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003721 with server:
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003722 with client_context.wrap_socket(
3723 socket.socket(),
3724 server_hostname=hostname) as s:
3725 s.connect((HOST, server.port))
3726 # get the data
3727 cb_data = s.get_channel_binding("tls-unique")
3728 if support.verbose:
3729 sys.stdout.write(
3730 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003731
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003732 # check if it is sane
3733 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003734 if s.version() == 'TLSv1.3':
3735 self.assertEqual(len(cb_data), 48)
3736 else:
3737 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003738
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003739 # and compare with the peers version
3740 s.write(b"CB tls-unique\n")
3741 peer_data_repr = s.read().strip()
3742 self.assertEqual(peer_data_repr,
3743 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003744
3745 # now, again
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003746 with client_context.wrap_socket(
3747 socket.socket(),
3748 server_hostname=hostname) as s:
3749 s.connect((HOST, server.port))
3750 new_cb_data = s.get_channel_binding("tls-unique")
3751 if support.verbose:
3752 sys.stdout.write(
3753 "got another channel binding data: {0!r}\n".format(
3754 new_cb_data)
3755 )
3756 # is it really unique
3757 self.assertNotEqual(cb_data, new_cb_data)
3758 self.assertIsNotNone(cb_data)
Miss Islington (bot)72ef4fc2018-05-23 13:49:04 -07003759 if s.version() == 'TLSv1.3':
3760 self.assertEqual(len(cb_data), 48)
3761 else:
3762 self.assertEqual(len(cb_data), 12) # True for TLSv1
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003763 s.write(b"CB tls-unique\n")
3764 peer_data_repr = s.read().strip()
3765 self.assertEqual(peer_data_repr,
3766 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003767
3768 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003769 client_context, server_context, hostname = testing_context()
3770 stats = server_params_test(client_context, server_context,
3771 chatty=True, connectionchatty=True,
3772 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003773 if support.verbose:
3774 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3775 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3776
3777 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3778 "ssl.OP_NO_COMPRESSION needed for this test")
3779 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003780 client_context, server_context, hostname = testing_context()
3781 client_context.options |= ssl.OP_NO_COMPRESSION
3782 server_context.options |= ssl.OP_NO_COMPRESSION
3783 stats = server_params_test(client_context, server_context,
3784 chatty=True, connectionchatty=True,
3785 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003786 self.assertIs(stats['compression'], None)
3787
3788 def test_dh_params(self):
3789 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003790 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003791 # test scenario needs TLS <= 1.2
3792 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003793 server_context.load_dh_params(DHFILE)
3794 server_context.set_ciphers("kEDH")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003795 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003796 stats = server_params_test(client_context, server_context,
3797 chatty=True, connectionchatty=True,
3798 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003799 cipher = stats["cipher"][0]
3800 parts = cipher.split("-")
3801 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3802 self.fail("Non-DH cipher: " + cipher[0])
3803
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003804 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003805 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003806 def test_ecdh_curve(self):
3807 # server secp384r1, client auto
3808 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003809
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003810 server_context.set_ecdh_curve("secp384r1")
3811 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3812 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3813 stats = server_params_test(client_context, server_context,
3814 chatty=True, connectionchatty=True,
3815 sni_name=hostname)
3816
3817 # server auto, client secp384r1
3818 client_context, server_context, hostname = testing_context()
3819 client_context.set_ecdh_curve("secp384r1")
3820 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3821 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3822 stats = server_params_test(client_context, server_context,
3823 chatty=True, connectionchatty=True,
3824 sni_name=hostname)
3825
3826 # server / client curve mismatch
3827 client_context, server_context, hostname = testing_context()
3828 client_context.set_ecdh_curve("prime256v1")
3829 server_context.set_ecdh_curve("secp384r1")
3830 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3831 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3832 try:
3833 stats = server_params_test(client_context, server_context,
3834 chatty=True, connectionchatty=True,
3835 sni_name=hostname)
3836 except ssl.SSLError:
3837 pass
3838 else:
3839 # OpenSSL 1.0.2 does not fail although it should.
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003840 if IS_OPENSSL_1_1_0:
Miss Islington (bot)ff7528f2018-02-25 01:56:11 -08003841 self.fail("mismatch curve did not fail")
3842
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003843 def test_selected_alpn_protocol(self):
3844 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003845 client_context, server_context, hostname = testing_context()
3846 stats = server_params_test(client_context, server_context,
3847 chatty=True, connectionchatty=True,
3848 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003849 self.assertIs(stats['client_alpn_protocol'], None)
3850
3851 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3852 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3853 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003854 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003855 server_context.set_alpn_protocols(['foo', 'bar'])
3856 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003857 chatty=True, connectionchatty=True,
3858 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003859 self.assertIs(stats['client_alpn_protocol'], None)
3860
3861 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3862 def test_alpn_protocols(self):
3863 server_protocols = ['foo', 'bar', 'milkshake']
3864 protocol_tests = [
3865 (['foo', 'bar'], 'foo'),
3866 (['bar', 'foo'], 'foo'),
3867 (['milkshake'], 'milkshake'),
3868 (['http/3.0', 'http/4.0'], None)
3869 ]
3870 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003871 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003872 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003873 client_context.set_alpn_protocols(client_protocols)
3874
3875 try:
3876 stats = server_params_test(client_context,
3877 server_context,
3878 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003879 connectionchatty=True,
3880 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003881 except ssl.SSLError as e:
3882 stats = e
3883
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08003884 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003885 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3886 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3887 self.assertIsInstance(stats, ssl.SSLError)
3888 else:
3889 msg = "failed trying %s (s) and %s (c).\n" \
3890 "was expecting %s, but got %%s from the %%s" \
3891 % (str(server_protocols), str(client_protocols),
3892 str(expected))
3893 client_result = stats['client_alpn_protocol']
3894 self.assertEqual(client_result, expected,
3895 msg % (client_result, "client"))
3896 server_result = stats['server_alpn_protocols'][-1] \
3897 if len(stats['server_alpn_protocols']) else 'nothing'
3898 self.assertEqual(server_result, expected,
3899 msg % (server_result, "server"))
3900
3901 def test_selected_npn_protocol(self):
3902 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003903 client_context, server_context, hostname = testing_context()
3904 stats = server_params_test(client_context, server_context,
3905 chatty=True, connectionchatty=True,
3906 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003907 self.assertIs(stats['client_npn_protocol'], None)
3908
3909 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3910 def test_npn_protocols(self):
3911 server_protocols = ['http/1.1', 'spdy/2']
3912 protocol_tests = [
3913 (['http/1.1', 'spdy/2'], 'http/1.1'),
3914 (['spdy/2', 'http/1.1'], 'http/1.1'),
3915 (['spdy/2', 'test'], 'spdy/2'),
3916 (['abc', 'def'], 'abc')
3917 ]
3918 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003919 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003920 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003921 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003922 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003923 chatty=True, connectionchatty=True,
3924 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003925 msg = "failed trying %s (s) and %s (c).\n" \
3926 "was expecting %s, but got %%s from the %%s" \
3927 % (str(server_protocols), str(client_protocols),
3928 str(expected))
3929 client_result = stats['client_npn_protocol']
3930 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3931 server_result = stats['server_npn_protocols'][-1] \
3932 if len(stats['server_npn_protocols']) else 'nothing'
3933 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3934
3935 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003936 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003937 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003938 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003940 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 client_context.load_verify_locations(SIGNING_CA)
3942 return server_context, other_context, client_context
3943
3944 def check_common_name(self, stats, name):
3945 cert = stats['peercert']
3946 self.assertIn((('commonName', name),), cert['subject'])
3947
3948 @needs_sni
3949 def test_sni_callback(self):
3950 calls = []
3951 server_context, other_context, client_context = self.sni_contexts()
3952
Christian Heimesa170fa12017-09-15 20:27:30 +02003953 client_context.check_hostname = False
3954
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003955 def servername_cb(ssl_sock, server_name, initial_context):
3956 calls.append((server_name, initial_context))
3957 if server_name is not None:
3958 ssl_sock.context = other_context
3959 server_context.set_servername_callback(servername_cb)
3960
3961 stats = server_params_test(client_context, server_context,
3962 chatty=True,
3963 sni_name='supermessage')
3964 # The hostname was fetched properly, and the certificate was
3965 # changed for the connection.
3966 self.assertEqual(calls, [("supermessage", server_context)])
3967 # CERTFILE4 was selected
3968 self.check_common_name(stats, 'fakehostname')
3969
3970 calls = []
3971 # The callback is called with server_name=None
3972 stats = server_params_test(client_context, server_context,
3973 chatty=True,
3974 sni_name=None)
3975 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003976 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003977
3978 # Check disabling the callback
3979 calls = []
3980 server_context.set_servername_callback(None)
3981
3982 stats = server_params_test(client_context, server_context,
3983 chatty=True,
3984 sni_name='notfunny')
3985 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003986 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003987 self.assertEqual(calls, [])
3988
3989 @needs_sni
3990 def test_sni_callback_alert(self):
3991 # Returning a TLS alert is reflected to the connecting client
3992 server_context, other_context, client_context = self.sni_contexts()
3993
3994 def cb_returning_alert(ssl_sock, server_name, initial_context):
3995 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3996 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003997 with self.assertRaises(ssl.SSLError) as cm:
3998 stats = server_params_test(client_context, server_context,
3999 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004000 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004001 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004002
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004003 @needs_sni
4004 def test_sni_callback_raising(self):
4005 # Raising fails the connection with a TLS handshake failure alert.
4006 server_context, other_context, client_context = self.sni_contexts()
4007
4008 def cb_raising(ssl_sock, server_name, initial_context):
4009 1/0
4010 server_context.set_servername_callback(cb_raising)
4011
4012 with self.assertRaises(ssl.SSLError) as cm, \
4013 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004014 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004015 chatty=False,
4016 sni_name='supermessage')
4017 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4018 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004019
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004020 @needs_sni
4021 def test_sni_callback_wrong_return_type(self):
4022 # Returning the wrong return type terminates the TLS connection
4023 # with an internal error alert.
4024 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004025
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004026 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4027 return "foo"
4028 server_context.set_servername_callback(cb_wrong_return_type)
4029
4030 with self.assertRaises(ssl.SSLError) as cm, \
4031 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004032 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004033 chatty=False,
4034 sni_name='supermessage')
4035 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4036 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004037
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004038 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004039 client_context, server_context, hostname = testing_context()
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004040 client_context.set_ciphers("AES128:AES256")
4041 server_context.set_ciphers("AES256")
4042 expected_algs = [
4043 "AES256", "AES-256",
4044 # TLS 1.3 ciphers are always enabled
4045 "TLS_CHACHA20", "TLS_AES",
4046 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004047
Christian Heimesa170fa12017-09-15 20:27:30 +02004048 stats = server_params_test(client_context, server_context,
4049 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004050 ciphers = stats['server_shared_ciphers'][0]
4051 self.assertGreater(len(ciphers), 0)
4052 for name, tls_version, bits in ciphers:
Miss Islington (bot)cd57b482018-05-22 14:40:46 -07004053 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004054 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004055
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004056 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004057 client_context, server_context, hostname = testing_context()
4058 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004059
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004060 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004061 s = client_context.wrap_socket(socket.socket(),
4062 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 s.connect((HOST, server.port))
4064 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004065
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004066 self.assertRaises(ValueError, s.read, 1024)
4067 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004068
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004069 def test_sendfile(self):
4070 TEST_DATA = b"x" * 512
4071 with open(support.TESTFN, 'wb') as f:
4072 f.write(TEST_DATA)
4073 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004074 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004075 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004076 context.load_verify_locations(SIGNING_CA)
4077 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004078 server = ThreadedEchoServer(context=context, chatty=False)
4079 with server:
4080 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004081 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004082 with open(support.TESTFN, 'rb') as file:
4083 s.sendfile(file)
4084 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004086 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004087 client_context, server_context, hostname = testing_context()
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004088 # TODO: sessions aren't compatible with TLSv1.3 yet
4089 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004090
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004091 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004092 stats = server_params_test(client_context, server_context,
4093 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004094 session = stats['session']
4095 self.assertTrue(session.id)
4096 self.assertGreater(session.time, 0)
4097 self.assertGreater(session.timeout, 0)
4098 self.assertTrue(session.has_ticket)
4099 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4100 self.assertGreater(session.ticket_lifetime_hint, 0)
4101 self.assertFalse(stats['session_reused'])
4102 sess_stat = server_context.session_stats()
4103 self.assertEqual(sess_stat['accept'], 1)
4104 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004106 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004107 stats = server_params_test(client_context, server_context,
4108 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004109 sess_stat = server_context.session_stats()
4110 self.assertEqual(sess_stat['accept'], 2)
4111 self.assertEqual(sess_stat['hits'], 1)
4112 self.assertTrue(stats['session_reused'])
4113 session2 = stats['session']
4114 self.assertEqual(session2.id, session.id)
4115 self.assertEqual(session2, session)
4116 self.assertIsNot(session2, session)
4117 self.assertGreaterEqual(session2.time, session.time)
4118 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004119
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004120 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004121 stats = server_params_test(client_context, server_context,
4122 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004123 self.assertFalse(stats['session_reused'])
4124 session3 = stats['session']
4125 self.assertNotEqual(session3.id, session.id)
4126 self.assertNotEqual(session3, session)
4127 sess_stat = server_context.session_stats()
4128 self.assertEqual(sess_stat['accept'], 3)
4129 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004130
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004132 stats = server_params_test(client_context, server_context,
4133 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004134 self.assertTrue(stats['session_reused'])
4135 session4 = stats['session']
4136 self.assertEqual(session4.id, session.id)
4137 self.assertEqual(session4, session)
4138 self.assertGreaterEqual(session4.time, session.time)
4139 self.assertGreaterEqual(session4.timeout, session.timeout)
4140 sess_stat = server_context.session_stats()
4141 self.assertEqual(sess_stat['accept'], 4)
4142 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004143
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004144 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004145 client_context, server_context, hostname = testing_context()
4146 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004147
Miss Islington (bot)2614ed42018-02-27 00:17:49 -08004148 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004149 client_context.options |= ssl.OP_NO_TLSv1_3
4150 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004151
Christian Heimesa170fa12017-09-15 20:27:30 +02004152 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004153 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004154 with client_context.wrap_socket(socket.socket(),
4155 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004156 # session is None before handshake
4157 self.assertEqual(s.session, None)
4158 self.assertEqual(s.session_reused, None)
4159 s.connect((HOST, server.port))
4160 session = s.session
4161 self.assertTrue(session)
4162 with self.assertRaises(TypeError) as e:
4163 s.session = object
Miss Islington (bot)42198572018-06-11 17:58:06 -07004164 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004165
Christian Heimesa170fa12017-09-15 20:27:30 +02004166 with client_context.wrap_socket(socket.socket(),
4167 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004168 s.connect((HOST, server.port))
4169 # cannot set session after handshake
4170 with self.assertRaises(ValueError) as e:
4171 s.session = session
4172 self.assertEqual(str(e.exception),
4173 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004174
Christian Heimesa170fa12017-09-15 20:27:30 +02004175 with client_context.wrap_socket(socket.socket(),
4176 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004177 # can set session before handshake and before the
4178 # connection was established
4179 s.session = session
4180 s.connect((HOST, server.port))
4181 self.assertEqual(s.session.id, session.id)
4182 self.assertEqual(s.session, session)
4183 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004184
Christian Heimesa170fa12017-09-15 20:27:30 +02004185 with client_context2.wrap_socket(socket.socket(),
4186 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004187 # cannot re-use session with a different SSLContext
4188 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004189 s.session = session
4190 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004191 self.assertEqual(str(e.exception),
4192 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004193
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004194
Christian Heimes2756ef32018-09-23 09:22:52 +02004195@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4196class TestPostHandshakeAuth(unittest.TestCase):
4197 def test_pha_setter(self):
4198 protocols = [
4199 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4200 ]
4201 for protocol in protocols:
4202 ctx = ssl.SSLContext(protocol)
4203 self.assertEqual(ctx.post_handshake_auth, False)
4204
4205 ctx.post_handshake_auth = True
4206 self.assertEqual(ctx.post_handshake_auth, True)
4207
4208 ctx.verify_mode = ssl.CERT_REQUIRED
4209 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4210 self.assertEqual(ctx.post_handshake_auth, True)
4211
4212 ctx.post_handshake_auth = False
4213 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4214 self.assertEqual(ctx.post_handshake_auth, False)
4215
4216 ctx.verify_mode = ssl.CERT_OPTIONAL
4217 ctx.post_handshake_auth = True
4218 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4219 self.assertEqual(ctx.post_handshake_auth, True)
4220
4221 def test_pha_required(self):
4222 client_context, server_context, hostname = testing_context()
4223 server_context.post_handshake_auth = True
4224 server_context.verify_mode = ssl.CERT_REQUIRED
4225 client_context.post_handshake_auth = True
4226 client_context.load_cert_chain(SIGNED_CERTFILE)
4227
4228 server = ThreadedEchoServer(context=server_context, chatty=False)
4229 with server:
4230 with client_context.wrap_socket(socket.socket(),
4231 server_hostname=hostname) as s:
4232 s.connect((HOST, server.port))
4233 s.write(b'HASCERT')
4234 self.assertEqual(s.recv(1024), b'FALSE\n')
4235 s.write(b'PHA')
4236 self.assertEqual(s.recv(1024), b'OK\n')
4237 s.write(b'HASCERT')
4238 self.assertEqual(s.recv(1024), b'TRUE\n')
4239 # PHA method just returns true when cert is already available
4240 s.write(b'PHA')
4241 self.assertEqual(s.recv(1024), b'OK\n')
4242 s.write(b'GETCERT')
4243 cert_text = s.recv(4096).decode('us-ascii')
4244 self.assertIn('Python Software Foundation CA', cert_text)
4245
4246 def test_pha_required_nocert(self):
4247 client_context, server_context, hostname = testing_context()
4248 server_context.post_handshake_auth = True
4249 server_context.verify_mode = ssl.CERT_REQUIRED
4250 client_context.post_handshake_auth = True
4251
4252 server = ThreadedEchoServer(context=server_context, chatty=False)
4253 with server:
4254 with client_context.wrap_socket(socket.socket(),
4255 server_hostname=hostname) as s:
4256 s.connect((HOST, server.port))
4257 s.write(b'PHA')
4258 # receive CertificateRequest
4259 self.assertEqual(s.recv(1024), b'OK\n')
4260 # send empty Certificate + Finish
4261 s.write(b'HASCERT')
4262 # receive alert
4263 with self.assertRaisesRegex(
4264 ssl.SSLError,
4265 'tlsv13 alert certificate required'):
4266 s.recv(1024)
4267
4268 def test_pha_optional(self):
4269 if support.verbose:
4270 sys.stdout.write("\n")
4271
4272 client_context, server_context, hostname = testing_context()
4273 server_context.post_handshake_auth = True
4274 server_context.verify_mode = ssl.CERT_REQUIRED
4275 client_context.post_handshake_auth = True
4276 client_context.load_cert_chain(SIGNED_CERTFILE)
4277
4278 # check CERT_OPTIONAL
4279 server_context.verify_mode = ssl.CERT_OPTIONAL
4280 server = ThreadedEchoServer(context=server_context, chatty=False)
4281 with server:
4282 with client_context.wrap_socket(socket.socket(),
4283 server_hostname=hostname) as s:
4284 s.connect((HOST, server.port))
4285 s.write(b'HASCERT')
4286 self.assertEqual(s.recv(1024), b'FALSE\n')
4287 s.write(b'PHA')
4288 self.assertEqual(s.recv(1024), b'OK\n')
4289 s.write(b'HASCERT')
4290 self.assertEqual(s.recv(1024), b'TRUE\n')
4291
4292 def test_pha_optional_nocert(self):
4293 if support.verbose:
4294 sys.stdout.write("\n")
4295
4296 client_context, server_context, hostname = testing_context()
4297 server_context.post_handshake_auth = True
4298 server_context.verify_mode = ssl.CERT_OPTIONAL
4299 client_context.post_handshake_auth = True
4300
4301 server = ThreadedEchoServer(context=server_context, chatty=False)
4302 with server:
4303 with client_context.wrap_socket(socket.socket(),
4304 server_hostname=hostname) as s:
4305 s.connect((HOST, server.port))
4306 s.write(b'HASCERT')
4307 self.assertEqual(s.recv(1024), b'FALSE\n')
4308 s.write(b'PHA')
4309 self.assertEqual(s.recv(1024), b'OK\n')
4310 # optional doens't fail when client does not have a cert
4311 s.write(b'HASCERT')
4312 self.assertEqual(s.recv(1024), b'FALSE\n')
4313
4314 def test_pha_no_pha_client(self):
4315 client_context, server_context, hostname = testing_context()
4316 server_context.post_handshake_auth = True
4317 server_context.verify_mode = ssl.CERT_REQUIRED
4318 client_context.load_cert_chain(SIGNED_CERTFILE)
4319
4320 server = ThreadedEchoServer(context=server_context, chatty=False)
4321 with server:
4322 with client_context.wrap_socket(socket.socket(),
4323 server_hostname=hostname) as s:
4324 s.connect((HOST, server.port))
4325 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4326 s.verify_client_post_handshake()
4327 s.write(b'PHA')
4328 self.assertIn(b'extension not received', s.recv(1024))
4329
4330 def test_pha_no_pha_server(self):
4331 # server doesn't have PHA enabled, cert is requested in handshake
4332 client_context, server_context, hostname = testing_context()
4333 server_context.verify_mode = ssl.CERT_REQUIRED
4334 client_context.post_handshake_auth = True
4335 client_context.load_cert_chain(SIGNED_CERTFILE)
4336
4337 server = ThreadedEchoServer(context=server_context, chatty=False)
4338 with server:
4339 with client_context.wrap_socket(socket.socket(),
4340 server_hostname=hostname) as s:
4341 s.connect((HOST, server.port))
4342 s.write(b'HASCERT')
4343 self.assertEqual(s.recv(1024), b'TRUE\n')
4344 # PHA doesn't fail if there is already a cert
4345 s.write(b'PHA')
4346 self.assertEqual(s.recv(1024), b'OK\n')
4347 s.write(b'HASCERT')
4348 self.assertEqual(s.recv(1024), b'TRUE\n')
4349
4350 def test_pha_not_tls13(self):
4351 # TLS 1.2
4352 client_context, server_context, hostname = testing_context()
4353 server_context.verify_mode = ssl.CERT_REQUIRED
4354 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4355 client_context.post_handshake_auth = True
4356 client_context.load_cert_chain(SIGNED_CERTFILE)
4357
4358 server = ThreadedEchoServer(context=server_context, chatty=False)
4359 with server:
4360 with client_context.wrap_socket(socket.socket(),
4361 server_hostname=hostname) as s:
4362 s.connect((HOST, server.port))
4363 # PHA fails for TLS != 1.3
4364 s.write(b'PHA')
4365 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4366
4367
Thomas Woutersed03b412007-08-28 21:37:11 +00004368def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004369 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004370 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004371 plats = {
4372 'Linux': platform.linux_distribution,
4373 'Mac': platform.mac_ver,
4374 'Windows': platform.win32_ver,
4375 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03004376 with warnings.catch_warnings():
4377 warnings.filterwarnings(
4378 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04004379 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03004380 'functions are deprecated .*',
Xtreakf5770f32018-07-05 22:29:46 +05304381 DeprecationWarning,
Berker Peksag9e7990a2015-05-16 23:21:26 +03004382 )
4383 for name, func in plats.items():
4384 plat = func()
4385 if plat and plat[0]:
4386 plat = '%s %r' % (name, plat)
4387 break
4388 else:
4389 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004390 print("test_ssl: testing with %r %r" %
4391 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4392 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004393 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004394 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4395 try:
4396 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4397 except AttributeError:
4398 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004399
Antoine Pitrou152efa22010-05-16 18:19:27 +00004400 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004401 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004402 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004403 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004404 BADCERT, BADKEY, EMPTYCERT]:
4405 if not os.path.exists(filename):
4406 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004407
Martin Panter3840b2a2016-03-27 01:53:46 +00004408 tests = [
4409 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes89c20512018-02-27 11:17:32 +01004410 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes2756ef32018-09-23 09:22:52 +02004411 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004412 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004413
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004414 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004415 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004416
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004417 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004418 try:
4419 support.run_unittest(*tests)
4420 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004421 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004422
4423if __name__ == "__main__":
4424 test_main()