blob: c72a85710d5cb8c0a9f24df5d9684a94d513c52d [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Christian Heimesc7f70692019-05-31 11:44:05 +020029from ssl import TLSVersion, _TLSContentType, _TLSMessageType, _TLSAlertType
Martin Panter3840b2a2016-03-27 01:53:46 +000030
Paul Monsonf3550692019-06-19 13:09:54 -070031Py_DEBUG = hasattr(sys, 'gettotalrefcount')
32Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
33
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010034PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020036IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010037IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
38IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010039PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000040
Victor Stinner3ef63442019-02-19 18:06:03 +010041PROTOCOL_TO_TLS_VERSION = {}
42for proto, ver in (
43 ("PROTOCOL_SSLv23", "SSLv3"),
44 ("PROTOCOL_TLSv1", "TLSv1"),
45 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
46):
47 try:
48 proto = getattr(ssl, proto)
49 ver = getattr(ssl.TLSVersion, ver)
50 except AttributeError:
51 continue
52 PROTOCOL_TO_TLS_VERSION[proto] = ver
53
Christian Heimesefff7062013-11-21 03:35:02 +010054def data_file(*name):
55 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000056
Antoine Pitrou81564092010-10-08 23:06:24 +000057# The custom key and certificate files used in test_ssl are generated
58# using Lib/test/make_ssl_certs.py.
59# Other certificates are simply fetched from the Internet servers they
60# are meant to authenticate.
61
Antoine Pitrou152efa22010-05-16 18:19:27 +000062CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000063BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000064ONLYCERT = data_file("ssl_cert.pem")
65ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000066BYTES_ONLYCERT = os.fsencode(ONLYCERT)
67BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020068CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
69ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
70KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000071CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000072BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010073CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
74CAFILE_CACERT = data_file("capath", "5ed36f99.0")
75
Christian Heimesbd5c7d22018-01-20 15:16:30 +010076CERTFILE_INFO = {
77 'issuer': ((('countryName', 'XY'),),
78 (('localityName', 'Castle Anthrax'),),
79 (('organizationName', 'Python Software Foundation'),),
80 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020081 'notAfter': 'Aug 26 14:23:15 2028 GMT',
82 'notBefore': 'Aug 29 14:23:15 2018 GMT',
83 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010084 'subject': ((('countryName', 'XY'),),
85 (('localityName', 'Castle Anthrax'),),
86 (('organizationName', 'Python Software Foundation'),),
87 (('commonName', 'localhost'),)),
88 'subjectAltName': (('DNS', 'localhost'),),
89 'version': 3
90}
Antoine Pitrou152efa22010-05-16 18:19:27 +000091
Christian Heimes22587792013-11-21 23:56:13 +010092# empty CRL
93CRLFILE = data_file("revocation.crl")
94
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010095# Two keys and certs signed by the same CA (for SNI tests)
96SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020097SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010098
99SIGNED_CERTFILE_INFO = {
100 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
101 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
102 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
103 'issuer': ((('countryName', 'XY'),),
104 (('organizationName', 'Python Software Foundation CA'),),
105 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200106 'notAfter': 'Jul 7 14:23:16 2028 GMT',
107 'notBefore': 'Aug 29 14:23:16 2018 GMT',
108 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100109 'subject': ((('countryName', 'XY'),),
110 (('localityName', 'Castle Anthrax'),),
111 (('organizationName', 'Python Software Foundation'),),
112 (('commonName', 'localhost'),)),
113 'subjectAltName': (('DNS', 'localhost'),),
114 'version': 3
115}
116
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100117SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200118SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100119SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
120SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
121
Martin Panter3840b2a2016-03-27 01:53:46 +0000122# Same certificate as pycacert.pem, but without extra text in file
123SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200124# cert with all kinds of subject alt names
125ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100126IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100127
Martin Panter3d81d932016-01-14 09:36:00 +0000128REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000129
130EMPTYCERT = data_file("nullcert.pem")
131BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000132NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000133BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200134NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200135NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100136TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000137
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200138DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100139BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000140
Christian Heimes358cfd42016-09-10 22:43:48 +0200141# Not defined in all versions of OpenSSL
142OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
143OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
144OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
145OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100146OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200147
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100148
Thomas Woutersed03b412007-08-28 21:37:11 +0000149def handle_error(prefix):
150 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000151 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000152 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000153
Antoine Pitroub5218772010-05-21 09:56:06 +0000154def can_clear_options():
155 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200156 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000157
158def no_sslv2_implies_sslv3_hello():
159 # 0.9.7h or higher
160 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
161
Christian Heimes2427b502013-11-23 11:24:32 +0100162def have_verify_flags():
163 # 0.9.8 or higher
164 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
165
Christian Heimesb7b92252018-02-25 09:49:31 +0100166def _have_secp_curves():
167 if not ssl.HAS_ECDH:
168 return False
169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
170 try:
171 ctx.set_ecdh_curve("secp384r1")
172 except ValueError:
173 return False
174 else:
175 return True
176
177
178HAVE_SECP_CURVES = _have_secp_curves()
179
180
Antoine Pitrouc695c952014-04-28 20:57:36 +0200181def utc_offset(): #NOTE: ignore issues like #1647654
182 # local time = utc time + utc offset
183 if time.daylight and time.localtime().tm_isdst > 0:
184 return -time.altzone # seconds
185 return -time.timezone
186
Christian Heimes9424bb42013-06-17 15:32:57 +0200187def asn1time(cert_time):
188 # Some versions of OpenSSL ignore seconds, see #18207
189 # 0.9.8.i
190 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
191 fmt = "%b %d %H:%M:%S %Y GMT"
192 dt = datetime.datetime.strptime(cert_time, fmt)
193 dt = dt.replace(second=0)
194 cert_time = dt.strftime(fmt)
195 # %d adds leading zero but ASN1_TIME_print() uses leading space
196 if cert_time[4] == "0":
197 cert_time = cert_time[:4] + " " + cert_time[5:]
198
199 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000200
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100201needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
202
Antoine Pitrou23df4832010-08-04 17:14:06 +0000203
Christian Heimesd0486372016-09-10 23:23:33 +0200204def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
205 cert_reqs=ssl.CERT_NONE, ca_certs=None,
206 ciphers=None, certfile=None, keyfile=None,
207 **kwargs):
208 context = ssl.SSLContext(ssl_version)
209 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200210 if cert_reqs == ssl.CERT_NONE:
211 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200212 context.verify_mode = cert_reqs
213 if ca_certs is not None:
214 context.load_verify_locations(ca_certs)
215 if certfile is not None or keyfile is not None:
216 context.load_cert_chain(certfile, keyfile)
217 if ciphers is not None:
218 context.set_ciphers(ciphers)
219 return context.wrap_socket(sock, **kwargs)
220
Christian Heimesa170fa12017-09-15 20:27:30 +0200221
222def testing_context(server_cert=SIGNED_CERTFILE):
223 """Create context
224
225 client_context, server_context, hostname = testing_context()
226 """
227 if server_cert == SIGNED_CERTFILE:
228 hostname = SIGNED_CERTFILE_HOSTNAME
229 elif server_cert == SIGNED_CERTFILE2:
230 hostname = SIGNED_CERTFILE2_HOSTNAME
231 else:
232 raise ValueError(server_cert)
233
234 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
235 client_context.load_verify_locations(SIGNING_CA)
236
237 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
238 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200239 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200240
241 return client_context, server_context, hostname
242
243
Antoine Pitrou152efa22010-05-16 18:19:27 +0000244class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000245
Antoine Pitrou480a1242010-04-28 21:37:09 +0000246 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000247 ssl.CERT_NONE
248 ssl.CERT_OPTIONAL
249 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100250 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100251 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100252 if ssl.HAS_ECDH:
253 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100254 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
255 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000256 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100257 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700258 ssl.OP_NO_SSLv2
259 ssl.OP_NO_SSLv3
260 ssl.OP_NO_TLSv1
261 ssl.OP_NO_TLSv1_3
262 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
263 ssl.OP_NO_TLSv1_1
264 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200265 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000266
Christian Heimes9d50ab52018-02-27 10:17:30 +0100267 def test_private_init(self):
268 with self.assertRaisesRegex(TypeError, "public constructor"):
269 with socket.socket() as s:
270 ssl.SSLSocket(s)
271
Antoine Pitrou172f0252014-04-18 20:33:08 +0200272 def test_str_for_enums(self):
273 # Make sure that the PROTOCOL_* constants have enum-like string
274 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200275 proto = ssl.PROTOCOL_TLS
276 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200277 ctx = ssl.SSLContext(proto)
278 self.assertIs(ctx.protocol, proto)
279
Antoine Pitrou480a1242010-04-28 21:37:09 +0000280 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000282 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000283 sys.stdout.write("\n RAND_status is %d (%s)\n"
284 % (v, (v and "sufficient randomness") or
285 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200286
287 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
288 self.assertEqual(len(data), 16)
289 self.assertEqual(is_cryptographic, v == 1)
290 if v:
291 data = ssl.RAND_bytes(16)
292 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200293 else:
294 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200295
Victor Stinner1e81a392013-12-19 16:47:04 +0100296 # negative num is invalid
297 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
298 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
299
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100300 if hasattr(ssl, 'RAND_egd'):
301 self.assertRaises(TypeError, ssl.RAND_egd, 1)
302 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000303 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200304 ssl.RAND_add(b"this is a random bytes object", 75.0)
305 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000306
Christian Heimesf77b4b22013-08-21 13:26:05 +0200307 @unittest.skipUnless(os.name == 'posix', 'requires posix')
308 def test_random_fork(self):
309 status = ssl.RAND_status()
310 if not status:
311 self.fail("OpenSSL's PRNG has insufficient randomness")
312
313 rfd, wfd = os.pipe()
314 pid = os.fork()
315 if pid == 0:
316 try:
317 os.close(rfd)
318 child_random = ssl.RAND_pseudo_bytes(16)[0]
319 self.assertEqual(len(child_random), 16)
320 os.write(wfd, child_random)
321 os.close(wfd)
322 except BaseException:
323 os._exit(1)
324 else:
325 os._exit(0)
326 else:
327 os.close(wfd)
328 self.addCleanup(os.close, rfd)
329 _, status = os.waitpid(pid, 0)
330 self.assertEqual(status, 0)
331
332 child_random = os.read(rfd, 16)
333 self.assertEqual(len(child_random), 16)
334 parent_random = ssl.RAND_pseudo_bytes(16)[0]
335 self.assertEqual(len(parent_random), 16)
336
337 self.assertNotEqual(child_random, parent_random)
338
Christian Heimese6dac002018-08-30 07:25:49 +0200339 maxDiff = None
340
Antoine Pitrou480a1242010-04-28 21:37:09 +0000341 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000342 # note that this uses an 'unofficial' function in _ssl.c,
343 # provided solely for this test, to exercise the certificate
344 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100345 self.assertEqual(
346 ssl._ssl._test_decode_cert(CERTFILE),
347 CERTFILE_INFO
348 )
349 self.assertEqual(
350 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
351 SIGNED_CERTFILE_INFO
352 )
353
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200354 # Issue #13034: the subjectAltName in some certificates
355 # (notably projects.developer.nokia.com:443) wasn't parsed
356 p = ssl._ssl._test_decode_cert(NOKIACERT)
357 if support.verbose:
358 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
359 self.assertEqual(p['subjectAltName'],
360 (('DNS', 'projects.developer.nokia.com'),
361 ('DNS', 'projects.forum.nokia.com'))
362 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100363 # extra OCSP and AIA fields
364 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
365 self.assertEqual(p['caIssuers'],
366 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
367 self.assertEqual(p['crlDistributionPoints'],
368 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000369
Christian Heimesa37f5242019-01-15 23:47:42 +0100370 def test_parse_cert_CVE_2019_5010(self):
371 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
372 if support.verbose:
373 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
374 self.assertEqual(
375 p,
376 {
377 'issuer': (
378 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
379 'notAfter': 'Jun 14 18:00:58 2028 GMT',
380 'notBefore': 'Jun 18 18:00:58 2018 GMT',
381 'serialNumber': '02',
382 'subject': ((('countryName', 'UK'),),
383 (('commonName',
384 'codenomicon-vm-2.test.lal.cisco.com'),)),
385 'subjectAltName': (
386 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
387 'version': 3
388 }
389 )
390
Christian Heimes824f7f32013-08-17 00:54:47 +0200391 def test_parse_cert_CVE_2013_4238(self):
392 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
393 if support.verbose:
394 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
395 subject = ((('countryName', 'US'),),
396 (('stateOrProvinceName', 'Oregon'),),
397 (('localityName', 'Beaverton'),),
398 (('organizationName', 'Python Software Foundation'),),
399 (('organizationalUnitName', 'Python Core Development'),),
400 (('commonName', 'null.python.org\x00example.org'),),
401 (('emailAddress', 'python-dev@python.org'),))
402 self.assertEqual(p['subject'], subject)
403 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200404 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
405 san = (('DNS', 'altnull.python.org\x00example.com'),
406 ('email', 'null@python.org\x00user@example.org'),
407 ('URI', 'http://null.python.org\x00http://example.org'),
408 ('IP Address', '192.0.2.1'),
409 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
410 else:
411 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
412 san = (('DNS', 'altnull.python.org\x00example.com'),
413 ('email', 'null@python.org\x00user@example.org'),
414 ('URI', 'http://null.python.org\x00http://example.org'),
415 ('IP Address', '192.0.2.1'),
416 ('IP Address', '<invalid>'))
417
418 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200419
Christian Heimes1c03abd2016-09-06 23:25:35 +0200420 def test_parse_all_sans(self):
421 p = ssl._ssl._test_decode_cert(ALLSANFILE)
422 self.assertEqual(p['subjectAltName'],
423 (
424 ('DNS', 'allsans'),
425 ('othername', '<unsupported>'),
426 ('othername', '<unsupported>'),
427 ('email', 'user@example.org'),
428 ('DNS', 'www.example.org'),
429 ('DirName',
430 ((('countryName', 'XY'),),
431 (('localityName', 'Castle Anthrax'),),
432 (('organizationName', 'Python Software Foundation'),),
433 (('commonName', 'dirname example'),))),
434 ('URI', 'https://www.python.org/'),
435 ('IP Address', '127.0.0.1'),
436 ('IP Address', '0:0:0:0:0:0:0:1\n'),
437 ('Registered ID', '1.2.3.4.5')
438 )
439 )
440
Antoine Pitrou480a1242010-04-28 21:37:09 +0000441 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000442 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000443 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000444 d1 = ssl.PEM_cert_to_DER_cert(pem)
445 p2 = ssl.DER_cert_to_PEM_cert(d1)
446 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000447 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000448 if not p2.startswith(ssl.PEM_HEADER + '\n'):
449 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
450 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
451 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000452
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000453 def test_openssl_version(self):
454 n = ssl.OPENSSL_VERSION_NUMBER
455 t = ssl.OPENSSL_VERSION_INFO
456 s = ssl.OPENSSL_VERSION
457 self.assertIsInstance(n, int)
458 self.assertIsInstance(t, tuple)
459 self.assertIsInstance(s, str)
460 # Some sanity checks follow
461 # >= 0.9
462 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400463 # < 3.0
464 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000465 major, minor, fix, patch, status = t
466 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400467 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000468 self.assertGreaterEqual(minor, 0)
469 self.assertLess(minor, 256)
470 self.assertGreaterEqual(fix, 0)
471 self.assertLess(fix, 256)
472 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100473 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000474 self.assertGreaterEqual(status, 0)
475 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400476 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200477 if IS_LIBRESSL:
478 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100479 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400480 else:
481 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100482 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000483
Antoine Pitrou9d543662010-04-23 23:10:32 +0000484 @support.cpython_only
485 def test_refcycle(self):
486 # Issue #7943: an SSL object doesn't create reference cycles with
487 # itself.
488 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200489 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000490 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100491 with support.check_warnings(("", ResourceWarning)):
492 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100493 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000494
Antoine Pitroua468adc2010-09-14 14:43:44 +0000495 def test_wrapped_unconnected(self):
496 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200497 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000498 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200499 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100500 self.assertRaises(OSError, ss.recv, 1)
501 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
502 self.assertRaises(OSError, ss.recvfrom, 1)
503 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
504 self.assertRaises(OSError, ss.send, b'x')
505 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200506 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100507 self.assertRaises(NotImplementedError, ss.sendmsg,
508 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200509 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
510 self.assertRaises(NotImplementedError, ss.recvmsg_into,
511 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000512
Antoine Pitrou40f08742010-04-24 22:04:40 +0000513 def test_timeout(self):
514 # Issue #8524: when creating an SSL socket, the timeout of the
515 # original socket should be retained.
516 for timeout in (None, 0.0, 5.0):
517 s = socket.socket(socket.AF_INET)
518 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200519 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100520 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000521
Christian Heimesd0486372016-09-10 23:23:33 +0200522 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000523 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000524 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000525 "certfile must be specified",
526 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000527 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000528 "certfile must be specified for server-side operations",
529 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000530 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000531 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200532 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100533 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
534 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200535 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200536 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000537 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000538 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000539 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200540 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000541 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000542 ssl.wrap_socket(sock,
543 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000544 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200545 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000546 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000547 ssl.wrap_socket(sock,
548 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000549 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000550
Martin Panter3464ea22016-02-01 21:58:11 +0000551 def bad_cert_test(self, certfile):
552 """Check that trying to use the given client certificate fails"""
553 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
554 certfile)
555 sock = socket.socket()
556 self.addCleanup(sock.close)
557 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200558 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200559 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000560
561 def test_empty_cert(self):
562 """Wrapping with an empty cert file"""
563 self.bad_cert_test("nullcert.pem")
564
565 def test_malformed_cert(self):
566 """Wrapping with a badly formatted certificate (syntax error)"""
567 self.bad_cert_test("badcert.pem")
568
569 def test_malformed_key(self):
570 """Wrapping with a badly formatted key (syntax error)"""
571 self.bad_cert_test("badkey.pem")
572
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000573 def test_match_hostname(self):
574 def ok(cert, hostname):
575 ssl.match_hostname(cert, hostname)
576 def fail(cert, hostname):
577 self.assertRaises(ssl.CertificateError,
578 ssl.match_hostname, cert, hostname)
579
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100580 # -- Hostname matching --
581
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000582 cert = {'subject': ((('commonName', 'example.com'),),)}
583 ok(cert, 'example.com')
584 ok(cert, 'ExAmple.cOm')
585 fail(cert, 'www.example.com')
586 fail(cert, '.example.com')
587 fail(cert, 'example.org')
588 fail(cert, 'exampleXcom')
589
590 cert = {'subject': ((('commonName', '*.a.com'),),)}
591 ok(cert, 'foo.a.com')
592 fail(cert, 'bar.foo.a.com')
593 fail(cert, 'a.com')
594 fail(cert, 'Xa.com')
595 fail(cert, '.a.com')
596
Mandeep Singhede2ac92017-11-27 04:01:27 +0530597 # only match wildcards when they are the only thing
598 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530600 fail(cert, 'foo.com')
601 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000602 fail(cert, 'bar.com')
603 fail(cert, 'foo.a.com')
604 fail(cert, 'bar.foo.com')
605
Christian Heimes824f7f32013-08-17 00:54:47 +0200606 # NULL bytes are bad, CVE-2013-4073
607 cert = {'subject': ((('commonName',
608 'null.python.org\x00example.org'),),)}
609 ok(cert, 'null.python.org\x00example.org') # or raise an error?
610 fail(cert, 'example.org')
611 fail(cert, 'null.python.org')
612
Georg Brandl72c98d32013-10-27 07:16:53 +0100613 # error cases with wildcards
614 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
615 fail(cert, 'bar.foo.a.com')
616 fail(cert, 'a.com')
617 fail(cert, 'Xa.com')
618 fail(cert, '.a.com')
619
620 cert = {'subject': ((('commonName', 'a.*.com'),),)}
621 fail(cert, 'a.foo.com')
622 fail(cert, 'a..com')
623 fail(cert, 'a.com')
624
625 # wildcard doesn't match IDNA prefix 'xn--'
626 idna = 'püthon.python.org'.encode("idna").decode("ascii")
627 cert = {'subject': ((('commonName', idna),),)}
628 ok(cert, idna)
629 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
630 fail(cert, idna)
631 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
632 fail(cert, idna)
633
634 # wildcard in first fragment and IDNA A-labels in sequent fragments
635 # are supported.
636 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
637 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530638 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
639 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100640 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
641 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
642
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000643 # Slightly fake real-world example
644 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
645 'subject': ((('commonName', 'linuxfrz.org'),),),
646 'subjectAltName': (('DNS', 'linuxfr.org'),
647 ('DNS', 'linuxfr.com'),
648 ('othername', '<unsupported>'))}
649 ok(cert, 'linuxfr.org')
650 ok(cert, 'linuxfr.com')
651 # Not a "DNS" entry
652 fail(cert, '<unsupported>')
653 # When there is a subjectAltName, commonName isn't used
654 fail(cert, 'linuxfrz.org')
655
656 # A pristine real-world example
657 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
658 'subject': ((('countryName', 'US'),),
659 (('stateOrProvinceName', 'California'),),
660 (('localityName', 'Mountain View'),),
661 (('organizationName', 'Google Inc'),),
662 (('commonName', 'mail.google.com'),))}
663 ok(cert, 'mail.google.com')
664 fail(cert, 'gmail.com')
665 # Only commonName is considered
666 fail(cert, 'California')
667
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100668 # -- IPv4 matching --
669 cert = {'subject': ((('commonName', 'example.com'),),),
670 'subjectAltName': (('DNS', 'example.com'),
671 ('IP Address', '10.11.12.13'),
672 ('IP Address', '14.15.16.17'))}
673 ok(cert, '10.11.12.13')
674 ok(cert, '14.15.16.17')
675 fail(cert, '14.15.16.18')
676 fail(cert, 'example.net')
677
678 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100679 if hasattr(socket, 'AF_INET6'):
680 cert = {'subject': ((('commonName', 'example.com'),),),
681 'subjectAltName': (
682 ('DNS', 'example.com'),
683 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
684 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
685 ok(cert, '2001::cafe')
686 ok(cert, '2003::baba')
687 fail(cert, '2003::bebe')
688 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100689
690 # -- Miscellaneous --
691
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000692 # Neither commonName nor subjectAltName
693 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
694 'subject': ((('countryName', 'US'),),
695 (('stateOrProvinceName', 'California'),),
696 (('localityName', 'Mountain View'),),
697 (('organizationName', 'Google Inc'),))}
698 fail(cert, 'mail.google.com')
699
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200700 # No DNS entry in subjectAltName but a commonName
701 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
702 'subject': ((('countryName', 'US'),),
703 (('stateOrProvinceName', 'California'),),
704 (('localityName', 'Mountain View'),),
705 (('commonName', 'mail.google.com'),)),
706 'subjectAltName': (('othername', 'blabla'), )}
707 ok(cert, 'mail.google.com')
708
709 # No DNS entry subjectAltName and no commonName
710 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
711 'subject': ((('countryName', 'US'),),
712 (('stateOrProvinceName', 'California'),),
713 (('localityName', 'Mountain View'),),
714 (('organizationName', 'Google Inc'),)),
715 'subjectAltName': (('othername', 'blabla'),)}
716 fail(cert, 'google.com')
717
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000718 # Empty cert / no cert
719 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
720 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
721
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200722 # Issue #17980: avoid denials of service by refusing more than one
723 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100724 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
725 with self.assertRaisesRegex(
726 ssl.CertificateError,
727 "partial wildcards in leftmost label are not supported"):
728 ssl.match_hostname(cert, 'axxb.example.com')
729
730 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
731 with self.assertRaisesRegex(
732 ssl.CertificateError,
733 "wildcard can only be present in the leftmost label"):
734 ssl.match_hostname(cert, 'www.sub.example.com')
735
736 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
737 with self.assertRaisesRegex(
738 ssl.CertificateError,
739 "too many wildcards"):
740 ssl.match_hostname(cert, 'axxbxxc.example.com')
741
742 cert = {'subject': ((('commonName', '*'),),)}
743 with self.assertRaisesRegex(
744 ssl.CertificateError,
745 "sole wildcard without additional labels are not support"):
746 ssl.match_hostname(cert, 'host')
747
748 cert = {'subject': ((('commonName', '*.com'),),)}
749 with self.assertRaisesRegex(
750 ssl.CertificateError,
751 r"hostname 'com' doesn't match '\*.com'"):
752 ssl.match_hostname(cert, 'com')
753
754 # extra checks for _inet_paton()
755 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
756 with self.assertRaises(ValueError):
757 ssl._inet_paton(invalid)
758 for ipaddr in ['127.0.0.1', '192.168.0.1']:
759 self.assertTrue(ssl._inet_paton(ipaddr))
760 if hasattr(socket, 'AF_INET6'):
761 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
762 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200763
Antoine Pitroud5323212010-10-22 18:19:07 +0000764 def test_server_side(self):
765 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200766 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000767 with socket.socket() as sock:
768 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
769 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000770
Antoine Pitroud6494802011-07-21 01:11:30 +0200771 def test_unknown_channel_binding(self):
772 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200773 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200774 c = socket.socket(socket.AF_INET)
775 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200776 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100777 with self.assertRaises(ValueError):
778 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200779 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200780
781 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
782 "'tls-unique' channel binding not available")
783 def test_tls_unique_channel_binding(self):
784 # unconnected should return None for known type
785 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200786 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100787 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200788 # the same for server-side
789 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200790 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100791 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200792
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600793 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200794 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600795 r = repr(ss)
796 with self.assertWarns(ResourceWarning) as cm:
797 ss = None
798 support.gc_collect()
799 self.assertIn(r, str(cm.warning.args[0]))
800
Christian Heimes6d7ad132013-06-09 18:02:55 +0200801 def test_get_default_verify_paths(self):
802 paths = ssl.get_default_verify_paths()
803 self.assertEqual(len(paths), 6)
804 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
805
806 with support.EnvironmentVarGuard() as env:
807 env["SSL_CERT_DIR"] = CAPATH
808 env["SSL_CERT_FILE"] = CERTFILE
809 paths = ssl.get_default_verify_paths()
810 self.assertEqual(paths.cafile, CERTFILE)
811 self.assertEqual(paths.capath, CAPATH)
812
Christian Heimes44109d72013-11-22 01:51:30 +0100813 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
814 def test_enum_certificates(self):
815 self.assertTrue(ssl.enum_certificates("CA"))
816 self.assertTrue(ssl.enum_certificates("ROOT"))
817
818 self.assertRaises(TypeError, ssl.enum_certificates)
819 self.assertRaises(WindowsError, ssl.enum_certificates, "")
820
Christian Heimesc2d65e12013-11-22 16:13:55 +0100821 trust_oids = set()
822 for storename in ("CA", "ROOT"):
823 store = ssl.enum_certificates(storename)
824 self.assertIsInstance(store, list)
825 for element in store:
826 self.assertIsInstance(element, tuple)
827 self.assertEqual(len(element), 3)
828 cert, enc, trust = element
829 self.assertIsInstance(cert, bytes)
830 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
831 self.assertIsInstance(trust, (set, bool))
832 if isinstance(trust, set):
833 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100834
835 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100836 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200837
Christian Heimes46bebee2013-06-09 19:03:31 +0200838 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100839 def test_enum_crls(self):
840 self.assertTrue(ssl.enum_crls("CA"))
841 self.assertRaises(TypeError, ssl.enum_crls)
842 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200843
Christian Heimes44109d72013-11-22 01:51:30 +0100844 crls = ssl.enum_crls("CA")
845 self.assertIsInstance(crls, list)
846 for element in crls:
847 self.assertIsInstance(element, tuple)
848 self.assertEqual(len(element), 2)
849 self.assertIsInstance(element[0], bytes)
850 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200851
Christian Heimes46bebee2013-06-09 19:03:31 +0200852
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100853 def test_asn1object(self):
854 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
855 '1.3.6.1.5.5.7.3.1')
856
857 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
858 self.assertEqual(val, expected)
859 self.assertEqual(val.nid, 129)
860 self.assertEqual(val.shortname, 'serverAuth')
861 self.assertEqual(val.longname, 'TLS Web Server Authentication')
862 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
863 self.assertIsInstance(val, ssl._ASN1Object)
864 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
865
866 val = ssl._ASN1Object.fromnid(129)
867 self.assertEqual(val, expected)
868 self.assertIsInstance(val, ssl._ASN1Object)
869 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100870 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
871 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100872 for i in range(1000):
873 try:
874 obj = ssl._ASN1Object.fromnid(i)
875 except ValueError:
876 pass
877 else:
878 self.assertIsInstance(obj.nid, int)
879 self.assertIsInstance(obj.shortname, str)
880 self.assertIsInstance(obj.longname, str)
881 self.assertIsInstance(obj.oid, (str, type(None)))
882
883 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
884 self.assertEqual(val, expected)
885 self.assertIsInstance(val, ssl._ASN1Object)
886 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
887 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
888 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100889 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
890 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100891
Christian Heimes72d28502013-11-23 13:56:58 +0100892 def test_purpose_enum(self):
893 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
894 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
895 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
896 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
897 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
898 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
899 '1.3.6.1.5.5.7.3.1')
900
901 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
902 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
903 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
904 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
905 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
906 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
907 '1.3.6.1.5.5.7.3.2')
908
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100909 def test_unsupported_dtls(self):
910 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
911 self.addCleanup(s.close)
912 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200913 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100914 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200915 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100916 with self.assertRaises(NotImplementedError) as cx:
917 ctx.wrap_socket(s)
918 self.assertEqual(str(cx.exception), "only stream sockets are supported")
919
Antoine Pitrouc695c952014-04-28 20:57:36 +0200920 def cert_time_ok(self, timestring, timestamp):
921 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
922
923 def cert_time_fail(self, timestring):
924 with self.assertRaises(ValueError):
925 ssl.cert_time_to_seconds(timestring)
926
927 @unittest.skipUnless(utc_offset(),
928 'local time needs to be different from UTC')
929 def test_cert_time_to_seconds_timezone(self):
930 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
931 # results if local timezone is not UTC
932 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
933 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
934
935 def test_cert_time_to_seconds(self):
936 timestring = "Jan 5 09:34:43 2018 GMT"
937 ts = 1515144883.0
938 self.cert_time_ok(timestring, ts)
939 # accept keyword parameter, assert its name
940 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
941 # accept both %e and %d (space or zero generated by strftime)
942 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
943 # case-insensitive
944 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
945 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
946 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
947 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
948 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
949 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
950 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
951 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
952
953 newyear_ts = 1230768000.0
954 # leap seconds
955 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
956 # same timestamp
957 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
958
959 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
960 # allow 60th second (even if it is not a leap second)
961 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
962 # allow 2nd leap second for compatibility with time.strptime()
963 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
964 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
965
Mike53f7a7c2017-12-14 14:04:53 +0300966 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200967 # 99991231235959Z (rfc 5280)
968 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
969
970 @support.run_with_locale('LC_ALL', '')
971 def test_cert_time_to_seconds_locale(self):
972 # `cert_time_to_seconds()` should be locale independent
973
974 def local_february_name():
975 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
976
977 if local_february_name().lower() == 'feb':
978 self.skipTest("locale-specific month name needs to be "
979 "different from C locale")
980
981 # locale-independent
982 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
983 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
984
Martin Panter3840b2a2016-03-27 01:53:46 +0000985 def test_connect_ex_error(self):
986 server = socket.socket(socket.AF_INET)
987 self.addCleanup(server.close)
988 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200989 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000990 cert_reqs=ssl.CERT_REQUIRED)
991 self.addCleanup(s.close)
992 rc = s.connect_ex((HOST, port))
993 # Issue #19919: Windows machines or VMs hosted on Windows
994 # machines sometimes return EWOULDBLOCK.
995 errors = (
996 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
997 errno.EWOULDBLOCK,
998 )
999 self.assertIn(rc, errors)
1000
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001001
Antoine Pitrou152efa22010-05-16 18:19:27 +00001002class ContextTests(unittest.TestCase):
1003
1004 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001005 for protocol in PROTOCOLS:
1006 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001007 ctx = ssl.SSLContext()
1008 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001009 self.assertRaises(ValueError, ssl.SSLContext, -1)
1010 self.assertRaises(ValueError, ssl.SSLContext, 42)
1011
1012 def test_protocol(self):
1013 for proto in PROTOCOLS:
1014 ctx = ssl.SSLContext(proto)
1015 self.assertEqual(ctx.protocol, proto)
1016
1017 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001018 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001019 ctx.set_ciphers("ALL")
1020 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001021 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001022 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001023
Christian Heimes892d66e2018-01-29 14:10:18 +01001024 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1025 "Test applies only to Python default ciphers")
1026 def test_python_ciphers(self):
1027 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1028 ciphers = ctx.get_ciphers()
1029 for suite in ciphers:
1030 name = suite['name']
1031 self.assertNotIn("PSK", name)
1032 self.assertNotIn("SRP", name)
1033 self.assertNotIn("MD5", name)
1034 self.assertNotIn("RC4", name)
1035 self.assertNotIn("3DES", name)
1036
Christian Heimes25bfcd52016-09-06 00:04:45 +02001037 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1038 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001039 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001040 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001041 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001042 self.assertIn('AES256-GCM-SHA384', names)
1043 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001044
Antoine Pitroub5218772010-05-21 09:56:06 +00001045 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001046 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001047 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001048 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001049 # SSLContext also enables these by default
1050 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001051 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1052 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001053 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001054 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001055 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001056 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001057 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1058 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001059 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001060 # Ubuntu has OP_NO_SSLv3 forced on by default
1061 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001062 else:
1063 with self.assertRaises(ValueError):
1064 ctx.options = 0
1065
Christian Heimesa170fa12017-09-15 20:27:30 +02001066 def test_verify_mode_protocol(self):
1067 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001068 # Default value
1069 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1070 ctx.verify_mode = ssl.CERT_OPTIONAL
1071 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1072 ctx.verify_mode = ssl.CERT_REQUIRED
1073 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1074 ctx.verify_mode = ssl.CERT_NONE
1075 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1076 with self.assertRaises(TypeError):
1077 ctx.verify_mode = None
1078 with self.assertRaises(ValueError):
1079 ctx.verify_mode = 42
1080
Christian Heimesa170fa12017-09-15 20:27:30 +02001081 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1082 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1083 self.assertFalse(ctx.check_hostname)
1084
1085 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1086 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1087 self.assertTrue(ctx.check_hostname)
1088
Christian Heimes61d478c2018-01-27 15:51:38 +01001089 def test_hostname_checks_common_name(self):
1090 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1091 self.assertTrue(ctx.hostname_checks_common_name)
1092 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1093 ctx.hostname_checks_common_name = True
1094 self.assertTrue(ctx.hostname_checks_common_name)
1095 ctx.hostname_checks_common_name = False
1096 self.assertFalse(ctx.hostname_checks_common_name)
1097 ctx.hostname_checks_common_name = True
1098 self.assertTrue(ctx.hostname_checks_common_name)
1099 else:
1100 with self.assertRaises(AttributeError):
1101 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001102
Christian Heimes698dde12018-02-27 11:54:43 +01001103 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1104 "required OpenSSL 1.1.0g")
1105 def test_min_max_version(self):
1106 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001107 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1108 # Fedora override the setting to TLS 1.0.
1109 self.assertIn(
1110 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001111 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1112 # Fedora 29 uses TLS 1.0 by default
1113 ssl.TLSVersion.TLSv1,
1114 # RHEL 8 uses TLS 1.2 by default
1115 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001116 )
1117 self.assertEqual(
1118 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1119 )
1120
1121 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1122 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1123 self.assertEqual(
1124 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1125 )
1126 self.assertEqual(
1127 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1128 )
1129
1130 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1131 ctx.maximum_version = ssl.TLSVersion.TLSv1
1132 self.assertEqual(
1133 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1134 )
1135 self.assertEqual(
1136 ctx.maximum_version, ssl.TLSVersion.TLSv1
1137 )
1138
1139 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1140 self.assertEqual(
1141 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1142 )
1143
1144 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1145 self.assertIn(
1146 ctx.maximum_version,
1147 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1148 )
1149
1150 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1151 self.assertIn(
1152 ctx.minimum_version,
1153 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1154 )
1155
1156 with self.assertRaises(ValueError):
1157 ctx.minimum_version = 42
1158
1159 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1160
1161 self.assertEqual(
1162 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1163 )
1164 self.assertEqual(
1165 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1166 )
1167 with self.assertRaises(ValueError):
1168 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1169 with self.assertRaises(ValueError):
1170 ctx.maximum_version = ssl.TLSVersion.TLSv1
1171
1172
Christian Heimes2427b502013-11-23 11:24:32 +01001173 @unittest.skipUnless(have_verify_flags(),
1174 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001175 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001177 # default value
1178 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1179 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001180 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1181 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1182 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1183 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1184 ctx.verify_flags = ssl.VERIFY_DEFAULT
1185 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1186 # supports any value
1187 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1188 self.assertEqual(ctx.verify_flags,
1189 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1190 with self.assertRaises(TypeError):
1191 ctx.verify_flags = None
1192
Antoine Pitrou152efa22010-05-16 18:19:27 +00001193 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001194 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001195 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001196 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001197 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1198 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001199 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001200 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001201 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001202 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001203 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001204 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001205 ctx.load_cert_chain(EMPTYCERT)
1206 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001207 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001208 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1209 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1210 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001211 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001212 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001213 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001214 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001215 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001216 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1217 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001218 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001219 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001220 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001221 # Password protected key and cert
1222 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1223 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1224 ctx.load_cert_chain(CERTFILE_PROTECTED,
1225 password=bytearray(KEY_PASSWORD.encode()))
1226 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1227 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1228 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1229 bytearray(KEY_PASSWORD.encode()))
1230 with self.assertRaisesRegex(TypeError, "should be a string"):
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1232 with self.assertRaises(ssl.SSLError):
1233 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1234 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1235 # openssl has a fixed limit on the password buffer.
1236 # PEM_BUFSIZE is generally set to 1kb.
1237 # Return a string larger than this.
1238 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1239 # Password callback
1240 def getpass_unicode():
1241 return KEY_PASSWORD
1242 def getpass_bytes():
1243 return KEY_PASSWORD.encode()
1244 def getpass_bytearray():
1245 return bytearray(KEY_PASSWORD.encode())
1246 def getpass_badpass():
1247 return "badpass"
1248 def getpass_huge():
1249 return b'a' * (1024 * 1024)
1250 def getpass_bad_type():
1251 return 9
1252 def getpass_exception():
1253 raise Exception('getpass error')
1254 class GetPassCallable:
1255 def __call__(self):
1256 return KEY_PASSWORD
1257 def getpass(self):
1258 return KEY_PASSWORD
1259 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1260 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1261 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1262 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1263 ctx.load_cert_chain(CERTFILE_PROTECTED,
1264 password=GetPassCallable().getpass)
1265 with self.assertRaises(ssl.SSLError):
1266 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1267 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1268 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1269 with self.assertRaisesRegex(TypeError, "must return a string"):
1270 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1271 with self.assertRaisesRegex(Exception, "getpass error"):
1272 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1273 # Make sure the password function isn't called if it isn't needed
1274 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001275
1276 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001277 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001278 ctx.load_verify_locations(CERTFILE)
1279 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1280 ctx.load_verify_locations(BYTES_CERTFILE)
1281 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1282 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001283 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001284 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001285 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001286 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001287 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001288 ctx.load_verify_locations(BADCERT)
1289 ctx.load_verify_locations(CERTFILE, CAPATH)
1290 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1291
Victor Stinner80f75e62011-01-29 11:31:20 +00001292 # Issue #10989: crash if the second argument type is invalid
1293 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1294
Christian Heimesefff7062013-11-21 03:35:02 +01001295 def test_load_verify_cadata(self):
1296 # test cadata
1297 with open(CAFILE_CACERT) as f:
1298 cacert_pem = f.read()
1299 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1300 with open(CAFILE_NEURONIO) as f:
1301 neuronio_pem = f.read()
1302 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1303
1304 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001306 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1307 ctx.load_verify_locations(cadata=cacert_pem)
1308 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1309 ctx.load_verify_locations(cadata=neuronio_pem)
1310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1311 # cert already in hash table
1312 ctx.load_verify_locations(cadata=neuronio_pem)
1313 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1314
1315 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001316 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001317 combined = "\n".join((cacert_pem, neuronio_pem))
1318 ctx.load_verify_locations(cadata=combined)
1319 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1320
1321 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001322 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001323 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1324 neuronio_pem, "tail"]
1325 ctx.load_verify_locations(cadata="\n".join(combined))
1326 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1327
1328 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001329 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001330 ctx.load_verify_locations(cadata=cacert_der)
1331 ctx.load_verify_locations(cadata=neuronio_der)
1332 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1333 # cert already in hash table
1334 ctx.load_verify_locations(cadata=cacert_der)
1335 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1336
1337 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001338 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001339 combined = b"".join((cacert_der, neuronio_der))
1340 ctx.load_verify_locations(cadata=combined)
1341 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1342
1343 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001344 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001345 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1346
1347 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1348 ctx.load_verify_locations(cadata="broken")
1349 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1350 ctx.load_verify_locations(cadata=b"broken")
1351
1352
Paul Monsonf3550692019-06-19 13:09:54 -07001353 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001354 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001355 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001356 ctx.load_dh_params(DHFILE)
1357 if os.name != 'nt':
1358 ctx.load_dh_params(BYTES_DHFILE)
1359 self.assertRaises(TypeError, ctx.load_dh_params)
1360 self.assertRaises(TypeError, ctx.load_dh_params, None)
1361 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001362 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001363 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001364 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001365 ctx.load_dh_params(CERTFILE)
1366
Antoine Pitroub0182c82010-10-12 20:09:02 +00001367 def test_session_stats(self):
1368 for proto in PROTOCOLS:
1369 ctx = ssl.SSLContext(proto)
1370 self.assertEqual(ctx.session_stats(), {
1371 'number': 0,
1372 'connect': 0,
1373 'connect_good': 0,
1374 'connect_renegotiate': 0,
1375 'accept': 0,
1376 'accept_good': 0,
1377 'accept_renegotiate': 0,
1378 'hits': 0,
1379 'misses': 0,
1380 'timeouts': 0,
1381 'cache_full': 0,
1382 })
1383
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001384 def test_set_default_verify_paths(self):
1385 # There's not much we can do to test that it acts as expected,
1386 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001387 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001388 ctx.set_default_verify_paths()
1389
Antoine Pitrou501da612011-12-21 09:27:41 +01001390 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001391 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001392 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001393 ctx.set_ecdh_curve("prime256v1")
1394 ctx.set_ecdh_curve(b"prime256v1")
1395 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1396 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1397 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1398 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1399
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001400 @needs_sni
1401 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001402 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001403
1404 # set_servername_callback expects a callable, or None
1405 self.assertRaises(TypeError, ctx.set_servername_callback)
1406 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1407 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1408 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1409
1410 def dummycallback(sock, servername, ctx):
1411 pass
1412 ctx.set_servername_callback(None)
1413 ctx.set_servername_callback(dummycallback)
1414
1415 @needs_sni
1416 def test_sni_callback_refcycle(self):
1417 # Reference cycles through the servername callback are detected
1418 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001419 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001420 def dummycallback(sock, servername, ctx, cycle=ctx):
1421 pass
1422 ctx.set_servername_callback(dummycallback)
1423 wr = weakref.ref(ctx)
1424 del ctx, dummycallback
1425 gc.collect()
1426 self.assertIs(wr(), None)
1427
Christian Heimes9a5395a2013-06-17 15:44:12 +02001428 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001429 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001430 self.assertEqual(ctx.cert_store_stats(),
1431 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1432 ctx.load_cert_chain(CERTFILE)
1433 self.assertEqual(ctx.cert_store_stats(),
1434 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1435 ctx.load_verify_locations(CERTFILE)
1436 self.assertEqual(ctx.cert_store_stats(),
1437 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001438 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001439 self.assertEqual(ctx.cert_store_stats(),
1440 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1441
1442 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001443 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001444 self.assertEqual(ctx.get_ca_certs(), [])
1445 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1446 ctx.load_verify_locations(CERTFILE)
1447 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001448 # but CAFILE_CACERT is a CA cert
1449 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001450 self.assertEqual(ctx.get_ca_certs(),
1451 [{'issuer': ((('organizationName', 'Root CA'),),
1452 (('organizationalUnitName', 'http://www.cacert.org'),),
1453 (('commonName', 'CA Cert Signing Authority'),),
1454 (('emailAddress', 'support@cacert.org'),)),
1455 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1456 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1457 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001458 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001459 'subject': ((('organizationName', 'Root CA'),),
1460 (('organizationalUnitName', 'http://www.cacert.org'),),
1461 (('commonName', 'CA Cert Signing Authority'),),
1462 (('emailAddress', 'support@cacert.org'),)),
1463 'version': 3}])
1464
Martin Panterb55f8b72016-01-14 12:53:56 +00001465 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001466 pem = f.read()
1467 der = ssl.PEM_cert_to_DER_cert(pem)
1468 self.assertEqual(ctx.get_ca_certs(True), [der])
1469
Christian Heimes72d28502013-11-23 13:56:58 +01001470 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001471 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001472 ctx.load_default_certs()
1473
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001475 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1476 ctx.load_default_certs()
1477
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001479 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1480
Christian Heimesa170fa12017-09-15 20:27:30 +02001481 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001482 self.assertRaises(TypeError, ctx.load_default_certs, None)
1483 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1484
Benjamin Peterson91244e02014-10-03 18:17:15 -04001485 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001486 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001487 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001488 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001489 with support.EnvironmentVarGuard() as env:
1490 env["SSL_CERT_DIR"] = CAPATH
1491 env["SSL_CERT_FILE"] = CERTFILE
1492 ctx.load_default_certs()
1493 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1494
Benjamin Peterson91244e02014-10-03 18:17:15 -04001495 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001496 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001497 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001499 ctx.load_default_certs()
1500 stats = ctx.cert_store_stats()
1501
Christian Heimesa170fa12017-09-15 20:27:30 +02001502 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001503 with support.EnvironmentVarGuard() as env:
1504 env["SSL_CERT_DIR"] = CAPATH
1505 env["SSL_CERT_FILE"] = CERTFILE
1506 ctx.load_default_certs()
1507 stats["x509"] += 1
1508 self.assertEqual(ctx.cert_store_stats(), stats)
1509
Christian Heimes358cfd42016-09-10 22:43:48 +02001510 def _assert_context_options(self, ctx):
1511 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1512 if OP_NO_COMPRESSION != 0:
1513 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1514 OP_NO_COMPRESSION)
1515 if OP_SINGLE_DH_USE != 0:
1516 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1517 OP_SINGLE_DH_USE)
1518 if OP_SINGLE_ECDH_USE != 0:
1519 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1520 OP_SINGLE_ECDH_USE)
1521 if OP_CIPHER_SERVER_PREFERENCE != 0:
1522 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1523 OP_CIPHER_SERVER_PREFERENCE)
1524
Christian Heimes4c05b472013-11-23 15:58:30 +01001525 def test_create_default_context(self):
1526 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001527
Christian Heimesa170fa12017-09-15 20:27:30 +02001528 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001529 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001530 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001531 self._assert_context_options(ctx)
1532
Christian Heimes4c05b472013-11-23 15:58:30 +01001533 with open(SIGNING_CA) as f:
1534 cadata = f.read()
1535 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1536 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001537 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001538 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001539 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001540
1541 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001543 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001544 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001545
Christian Heimes67986f92013-11-23 22:43:47 +01001546 def test__create_stdlib_context(self):
1547 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001548 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001549 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001550 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001551 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001552
1553 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1554 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1555 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001556 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001557
1558 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001559 cert_reqs=ssl.CERT_REQUIRED,
1560 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001561 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1562 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001563 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001564 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001565
1566 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001567 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001568 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001569 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001570
Christian Heimes1aa9a752013-12-02 02:41:19 +01001571 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001572 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001573 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001574 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575
Christian Heimese82c0342017-09-15 20:29:57 +02001576 # Auto set CERT_REQUIRED
1577 ctx.check_hostname = True
1578 self.assertTrue(ctx.check_hostname)
1579 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1580 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001581 ctx.verify_mode = ssl.CERT_REQUIRED
1582 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001583 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001584
Christian Heimese82c0342017-09-15 20:29:57 +02001585 # Changing verify_mode does not affect check_hostname
1586 ctx.check_hostname = False
1587 ctx.verify_mode = ssl.CERT_NONE
1588 ctx.check_hostname = False
1589 self.assertFalse(ctx.check_hostname)
1590 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1591 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001592 ctx.check_hostname = True
1593 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001594 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1595
1596 ctx.check_hostname = False
1597 ctx.verify_mode = ssl.CERT_OPTIONAL
1598 ctx.check_hostname = False
1599 self.assertFalse(ctx.check_hostname)
1600 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1601 # keep CERT_OPTIONAL
1602 ctx.check_hostname = True
1603 self.assertTrue(ctx.check_hostname)
1604 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001605
1606 # Cannot set CERT_NONE with check_hostname enabled
1607 with self.assertRaises(ValueError):
1608 ctx.verify_mode = ssl.CERT_NONE
1609 ctx.check_hostname = False
1610 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001611 ctx.verify_mode = ssl.CERT_NONE
1612 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001613
Christian Heimes5fe668c2016-09-12 00:01:11 +02001614 def test_context_client_server(self):
1615 # PROTOCOL_TLS_CLIENT has sane defaults
1616 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1617 self.assertTrue(ctx.check_hostname)
1618 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1619
1620 # PROTOCOL_TLS_SERVER has different but also sane defaults
1621 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1622 self.assertFalse(ctx.check_hostname)
1623 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1624
Christian Heimes4df60f12017-09-15 20:26:05 +02001625 def test_context_custom_class(self):
1626 class MySSLSocket(ssl.SSLSocket):
1627 pass
1628
1629 class MySSLObject(ssl.SSLObject):
1630 pass
1631
1632 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1633 ctx.sslsocket_class = MySSLSocket
1634 ctx.sslobject_class = MySSLObject
1635
1636 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1637 self.assertIsInstance(sock, MySSLSocket)
1638 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1639 self.assertIsInstance(obj, MySSLObject)
1640
Christian Heimes78c7d522019-06-03 21:00:10 +02001641 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1642 def test_num_tickest(self):
1643 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1644 self.assertEqual(ctx.num_tickets, 2)
1645 ctx.num_tickets = 1
1646 self.assertEqual(ctx.num_tickets, 1)
1647 ctx.num_tickets = 0
1648 self.assertEqual(ctx.num_tickets, 0)
1649 with self.assertRaises(ValueError):
1650 ctx.num_tickets = -1
1651 with self.assertRaises(TypeError):
1652 ctx.num_tickets = None
1653
1654 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1655 self.assertEqual(ctx.num_tickets, 2)
1656 with self.assertRaises(ValueError):
1657 ctx.num_tickets = 1
1658
Antoine Pitrou152efa22010-05-16 18:19:27 +00001659
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001660class SSLErrorTests(unittest.TestCase):
1661
1662 def test_str(self):
1663 # The str() of a SSLError doesn't include the errno
1664 e = ssl.SSLError(1, "foo")
1665 self.assertEqual(str(e), "foo")
1666 self.assertEqual(e.errno, 1)
1667 # Same for a subclass
1668 e = ssl.SSLZeroReturnError(1, "foo")
1669 self.assertEqual(str(e), "foo")
1670 self.assertEqual(e.errno, 1)
1671
Paul Monsonf3550692019-06-19 13:09:54 -07001672 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001673 def test_lib_reason(self):
1674 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001675 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001676 with self.assertRaises(ssl.SSLError) as cm:
1677 ctx.load_dh_params(CERTFILE)
1678 self.assertEqual(cm.exception.library, 'PEM')
1679 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1680 s = str(cm.exception)
1681 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1682
1683 def test_subclass(self):
1684 # Check that the appropriate SSLError subclass is raised
1685 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001686 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1687 ctx.check_hostname = False
1688 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001689 with socket.create_server(("127.0.0.1", 0)) as s:
1690 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001691 c.setblocking(False)
1692 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001693 with self.assertRaises(ssl.SSLWantReadError) as cm:
1694 c.do_handshake()
1695 s = str(cm.exception)
1696 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1697 # For compatibility
1698 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1699
1700
Christian Heimes61d478c2018-01-27 15:51:38 +01001701 def test_bad_server_hostname(self):
1702 ctx = ssl.create_default_context()
1703 with self.assertRaises(ValueError):
1704 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1705 server_hostname="")
1706 with self.assertRaises(ValueError):
1707 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1708 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001709 with self.assertRaises(TypeError):
1710 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1711 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001712
1713
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001714class MemoryBIOTests(unittest.TestCase):
1715
1716 def test_read_write(self):
1717 bio = ssl.MemoryBIO()
1718 bio.write(b'foo')
1719 self.assertEqual(bio.read(), b'foo')
1720 self.assertEqual(bio.read(), b'')
1721 bio.write(b'foo')
1722 bio.write(b'bar')
1723 self.assertEqual(bio.read(), b'foobar')
1724 self.assertEqual(bio.read(), b'')
1725 bio.write(b'baz')
1726 self.assertEqual(bio.read(2), b'ba')
1727 self.assertEqual(bio.read(1), b'z')
1728 self.assertEqual(bio.read(1), b'')
1729
1730 def test_eof(self):
1731 bio = ssl.MemoryBIO()
1732 self.assertFalse(bio.eof)
1733 self.assertEqual(bio.read(), b'')
1734 self.assertFalse(bio.eof)
1735 bio.write(b'foo')
1736 self.assertFalse(bio.eof)
1737 bio.write_eof()
1738 self.assertFalse(bio.eof)
1739 self.assertEqual(bio.read(2), b'fo')
1740 self.assertFalse(bio.eof)
1741 self.assertEqual(bio.read(1), b'o')
1742 self.assertTrue(bio.eof)
1743 self.assertEqual(bio.read(), b'')
1744 self.assertTrue(bio.eof)
1745
1746 def test_pending(self):
1747 bio = ssl.MemoryBIO()
1748 self.assertEqual(bio.pending, 0)
1749 bio.write(b'foo')
1750 self.assertEqual(bio.pending, 3)
1751 for i in range(3):
1752 bio.read(1)
1753 self.assertEqual(bio.pending, 3-i-1)
1754 for i in range(3):
1755 bio.write(b'x')
1756 self.assertEqual(bio.pending, i+1)
1757 bio.read()
1758 self.assertEqual(bio.pending, 0)
1759
1760 def test_buffer_types(self):
1761 bio = ssl.MemoryBIO()
1762 bio.write(b'foo')
1763 self.assertEqual(bio.read(), b'foo')
1764 bio.write(bytearray(b'bar'))
1765 self.assertEqual(bio.read(), b'bar')
1766 bio.write(memoryview(b'baz'))
1767 self.assertEqual(bio.read(), b'baz')
1768
1769 def test_error_types(self):
1770 bio = ssl.MemoryBIO()
1771 self.assertRaises(TypeError, bio.write, 'foo')
1772 self.assertRaises(TypeError, bio.write, None)
1773 self.assertRaises(TypeError, bio.write, True)
1774 self.assertRaises(TypeError, bio.write, 1)
1775
1776
Christian Heimes9d50ab52018-02-27 10:17:30 +01001777class SSLObjectTests(unittest.TestCase):
1778 def test_private_init(self):
1779 bio = ssl.MemoryBIO()
1780 with self.assertRaisesRegex(TypeError, "public constructor"):
1781 ssl.SSLObject(bio, bio)
1782
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001783 def test_unwrap(self):
1784 client_ctx, server_ctx, hostname = testing_context()
1785 c_in = ssl.MemoryBIO()
1786 c_out = ssl.MemoryBIO()
1787 s_in = ssl.MemoryBIO()
1788 s_out = ssl.MemoryBIO()
1789 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1790 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1791
1792 # Loop on the handshake for a bit to get it settled
1793 for _ in range(5):
1794 try:
1795 client.do_handshake()
1796 except ssl.SSLWantReadError:
1797 pass
1798 if c_out.pending:
1799 s_in.write(c_out.read())
1800 try:
1801 server.do_handshake()
1802 except ssl.SSLWantReadError:
1803 pass
1804 if s_out.pending:
1805 c_in.write(s_out.read())
1806 # Now the handshakes should be complete (don't raise WantReadError)
1807 client.do_handshake()
1808 server.do_handshake()
1809
1810 # Now if we unwrap one side unilaterally, it should send close-notify
1811 # and raise WantReadError:
1812 with self.assertRaises(ssl.SSLWantReadError):
1813 client.unwrap()
1814
1815 # But server.unwrap() does not raise, because it reads the client's
1816 # close-notify:
1817 s_in.write(c_out.read())
1818 server.unwrap()
1819
1820 # And now that the client gets the server's close-notify, it doesn't
1821 # raise either.
1822 c_in.write(s_out.read())
1823 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001824
Martin Panter3840b2a2016-03-27 01:53:46 +00001825class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001826 """Tests that connect to a simple server running in the background"""
1827
1828 def setUp(self):
1829 server = ThreadedEchoServer(SIGNED_CERTFILE)
1830 self.server_addr = (HOST, server.port)
1831 server.__enter__()
1832 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001833
Antoine Pitrou480a1242010-04-28 21:37:09 +00001834 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001835 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001836 cert_reqs=ssl.CERT_NONE) as s:
1837 s.connect(self.server_addr)
1838 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001839 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001840
Martin Panter3840b2a2016-03-27 01:53:46 +00001841 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001842 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 cert_reqs=ssl.CERT_REQUIRED,
1844 ca_certs=SIGNING_CA) as s:
1845 s.connect(self.server_addr)
1846 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001847 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001848
Martin Panter3840b2a2016-03-27 01:53:46 +00001849 def test_connect_fail(self):
1850 # This should fail because we have no verification certs. Connection
1851 # failure crashes ThreadedEchoServer, so run this in an independent
1852 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001853 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001854 cert_reqs=ssl.CERT_REQUIRED)
1855 self.addCleanup(s.close)
1856 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1857 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001858
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001859 def test_connect_ex(self):
1860 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001861 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001862 cert_reqs=ssl.CERT_REQUIRED,
1863 ca_certs=SIGNING_CA)
1864 self.addCleanup(s.close)
1865 self.assertEqual(0, s.connect_ex(self.server_addr))
1866 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001867
1868 def test_non_blocking_connect_ex(self):
1869 # Issue #11326: non-blocking connect_ex() should allow handshake
1870 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001871 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 cert_reqs=ssl.CERT_REQUIRED,
1873 ca_certs=SIGNING_CA,
1874 do_handshake_on_connect=False)
1875 self.addCleanup(s.close)
1876 s.setblocking(False)
1877 rc = s.connect_ex(self.server_addr)
1878 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1879 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1880 # Wait for connect to finish
1881 select.select([], [s], [], 5.0)
1882 # Non-blocking handshake
1883 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001884 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001885 s.do_handshake()
1886 break
1887 except ssl.SSLWantReadError:
1888 select.select([s], [], [], 5.0)
1889 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001890 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001891 # SSL established
1892 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001893
Antoine Pitrou152efa22010-05-16 18:19:27 +00001894 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001895 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001896 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001897 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1898 s.connect(self.server_addr)
1899 self.assertEqual({}, s.getpeercert())
1900 # Same with a server hostname
1901 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1902 server_hostname="dummy") as s:
1903 s.connect(self.server_addr)
1904 ctx.verify_mode = ssl.CERT_REQUIRED
1905 # This should succeed because we specify the root cert
1906 ctx.load_verify_locations(SIGNING_CA)
1907 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1908 s.connect(self.server_addr)
1909 cert = s.getpeercert()
1910 self.assertTrue(cert)
1911
1912 def test_connect_with_context_fail(self):
1913 # This should fail because we have no verification certs. Connection
1914 # failure crashes ThreadedEchoServer, so run this in an independent
1915 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001916 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001917 ctx.verify_mode = ssl.CERT_REQUIRED
1918 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1919 self.addCleanup(s.close)
1920 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1921 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001922
1923 def test_connect_capath(self):
1924 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001925 # NOTE: the subject hashing algorithm has been changed between
1926 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1927 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001928 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001929 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001930 ctx.verify_mode = ssl.CERT_REQUIRED
1931 ctx.load_verify_locations(capath=CAPATH)
1932 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1933 s.connect(self.server_addr)
1934 cert = s.getpeercert()
1935 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001936
Martin Panter3840b2a2016-03-27 01:53:46 +00001937 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001938 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001939 ctx.verify_mode = ssl.CERT_REQUIRED
1940 ctx.load_verify_locations(capath=BYTES_CAPATH)
1941 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1942 s.connect(self.server_addr)
1943 cert = s.getpeercert()
1944 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001945
Christian Heimesefff7062013-11-21 03:35:02 +01001946 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001947 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001948 pem = f.read()
1949 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001950 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001951 ctx.verify_mode = ssl.CERT_REQUIRED
1952 ctx.load_verify_locations(cadata=pem)
1953 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1954 s.connect(self.server_addr)
1955 cert = s.getpeercert()
1956 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001957
Martin Panter3840b2a2016-03-27 01:53:46 +00001958 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001959 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 ctx.verify_mode = ssl.CERT_REQUIRED
1961 ctx.load_verify_locations(cadata=der)
1962 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1963 s.connect(self.server_addr)
1964 cert = s.getpeercert()
1965 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001966
Antoine Pitroue3220242010-04-24 11:13:53 +00001967 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1968 def test_makefile_close(self):
1969 # Issue #5238: creating a file-like object with makefile() shouldn't
1970 # delay closing the underlying "real socket" (here tested with its
1971 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001972 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001973 ss.connect(self.server_addr)
1974 fd = ss.fileno()
1975 f = ss.makefile()
1976 f.close()
1977 # The fd is still open
1978 os.read(fd, 0)
1979 # Closing the SSL socket should close the fd too
1980 ss.close()
1981 gc.collect()
1982 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001983 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001984 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001985
Antoine Pitrou480a1242010-04-28 21:37:09 +00001986 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001987 s = socket.socket(socket.AF_INET)
1988 s.connect(self.server_addr)
1989 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001990 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001991 cert_reqs=ssl.CERT_NONE,
1992 do_handshake_on_connect=False)
1993 self.addCleanup(s.close)
1994 count = 0
1995 while True:
1996 try:
1997 count += 1
1998 s.do_handshake()
1999 break
2000 except ssl.SSLWantReadError:
2001 select.select([s], [], [])
2002 except ssl.SSLWantWriteError:
2003 select.select([], [s], [])
2004 if support.verbose:
2005 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002006
Antoine Pitrou480a1242010-04-28 21:37:09 +00002007 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002008 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002009
Martin Panter3840b2a2016-03-27 01:53:46 +00002010 def test_get_server_certificate_fail(self):
2011 # Connection failure crashes ThreadedEchoServer, so run this in an
2012 # independent test method
2013 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002014
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002015 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002016 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002017 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2018 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002019 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002020 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2021 s.connect(self.server_addr)
2022 # Error checking can happen at instantiation or when connecting
2023 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2024 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002025 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002026 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2027 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002028
Christian Heimes9a5395a2013-06-17 15:44:12 +02002029 def test_get_ca_certs_capath(self):
2030 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002031 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002032 ctx.load_verify_locations(capath=CAPATH)
2033 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002034 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2035 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002036 s.connect(self.server_addr)
2037 cert = s.getpeercert()
2038 self.assertTrue(cert)
2039 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002040
Christian Heimes575596e2013-12-15 21:49:17 +01002041 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002042 def test_context_setget(self):
2043 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002044 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2045 ctx1.load_verify_locations(capath=CAPATH)
2046 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2047 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002048 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002049 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002050 ss.connect(self.server_addr)
2051 self.assertIs(ss.context, ctx1)
2052 self.assertIs(ss._sslobj.context, ctx1)
2053 ss.context = ctx2
2054 self.assertIs(ss.context, ctx2)
2055 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002056
2057 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2058 # A simple IO loop. Call func(*args) depending on the error we get
2059 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2060 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002061 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002062 count = 0
2063 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002064 if time.monotonic() > deadline:
2065 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002066 errno = None
2067 count += 1
2068 try:
2069 ret = func(*args)
2070 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002071 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002072 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002073 raise
2074 errno = e.errno
2075 # Get any data from the outgoing BIO irrespective of any error, and
2076 # send it to the socket.
2077 buf = outgoing.read()
2078 sock.sendall(buf)
2079 # If there's no error, we're done. For WANT_READ, we need to get
2080 # data from the socket and put it in the incoming BIO.
2081 if errno is None:
2082 break
2083 elif errno == ssl.SSL_ERROR_WANT_READ:
2084 buf = sock.recv(32768)
2085 if buf:
2086 incoming.write(buf)
2087 else:
2088 incoming.write_eof()
2089 if support.verbose:
2090 sys.stdout.write("Needed %d calls to complete %s().\n"
2091 % (count, func.__name__))
2092 return ret
2093
Martin Panter3840b2a2016-03-27 01:53:46 +00002094 def test_bio_handshake(self):
2095 sock = socket.socket(socket.AF_INET)
2096 self.addCleanup(sock.close)
2097 sock.connect(self.server_addr)
2098 incoming = ssl.MemoryBIO()
2099 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002100 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2101 self.assertTrue(ctx.check_hostname)
2102 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002103 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002104 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2105 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002106 self.assertIs(sslobj._sslobj.owner, sslobj)
2107 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002108 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002109 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002110 self.assertRaises(ValueError, sslobj.getpeercert)
2111 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2112 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2113 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2114 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002115 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002116 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002117 self.assertTrue(sslobj.getpeercert())
2118 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2119 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2120 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002121 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002122 except ssl.SSLSyscallError:
2123 # If the server shuts down the TCP connection without sending a
2124 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2125 pass
2126 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2127
2128 def test_bio_read_write_data(self):
2129 sock = socket.socket(socket.AF_INET)
2130 self.addCleanup(sock.close)
2131 sock.connect(self.server_addr)
2132 incoming = ssl.MemoryBIO()
2133 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002134 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002135 ctx.verify_mode = ssl.CERT_NONE
2136 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2137 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2138 req = b'FOO\n'
2139 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2140 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2141 self.assertEqual(buf, b'foo\n')
2142 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002143
2144
Martin Panter3840b2a2016-03-27 01:53:46 +00002145class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002146
Martin Panter3840b2a2016-03-27 01:53:46 +00002147 def test_timeout_connect_ex(self):
2148 # Issue #12065: on a timeout, connect_ex() should return the original
2149 # errno (mimicking the behaviour of non-SSL sockets).
2150 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002151 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002152 cert_reqs=ssl.CERT_REQUIRED,
2153 do_handshake_on_connect=False)
2154 self.addCleanup(s.close)
2155 s.settimeout(0.0000001)
2156 rc = s.connect_ex((REMOTE_HOST, 443))
2157 if rc == 0:
2158 self.skipTest("REMOTE_HOST responded too quickly")
2159 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2160
2161 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2162 def test_get_server_certificate_ipv6(self):
2163 with support.transient_internet('ipv6.google.com'):
2164 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2165 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2166
Martin Panter3840b2a2016-03-27 01:53:46 +00002167
2168def _test_get_server_certificate(test, host, port, cert=None):
2169 pem = ssl.get_server_certificate((host, port))
2170 if not pem:
2171 test.fail("No server certificate on %s:%s!" % (host, port))
2172
2173 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2174 if not pem:
2175 test.fail("No server certificate on %s:%s!" % (host, port))
2176 if support.verbose:
2177 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2178
2179def _test_get_server_certificate_fail(test, host, port):
2180 try:
2181 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2182 except ssl.SSLError as x:
2183 #should fail
2184 if support.verbose:
2185 sys.stdout.write("%s\n" % x)
2186 else:
2187 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2188
2189
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002190from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002191
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002192class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002193
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002194 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002195
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002196 """A mildly complicated class, because we want it to work both
2197 with and without the SSL wrapper around the socket connection, so
2198 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002199
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002200 def __init__(self, server, connsock, addr):
2201 self.server = server
2202 self.running = False
2203 self.sock = connsock
2204 self.addr = addr
2205 self.sock.setblocking(1)
2206 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002207 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002208 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002209
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002210 def wrap_conn(self):
2211 try:
2212 self.sslconn = self.server.context.wrap_socket(
2213 self.sock, server_side=True)
2214 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2215 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002216 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002217 # We treat ConnectionResetError as though it were an
2218 # SSLError - OpenSSL on Ubuntu abruptly closes the
2219 # connection when asked to use an unsupported protocol.
2220 #
Christian Heimes529525f2018-05-23 22:24:45 +02002221 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2222 # tries to send session tickets after handshake.
2223 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002224 #
2225 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2226 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002227 self.server.conn_errors.append(str(e))
2228 if self.server.chatty:
2229 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2230 self.running = False
2231 self.close()
2232 return False
2233 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002234 # OSError may occur with wrong protocols, e.g. both
2235 # sides use PROTOCOL_TLS_SERVER.
2236 #
2237 # XXX Various errors can have happened here, for example
2238 # a mismatching protocol version, an invalid certificate,
2239 # or a low-level bug. This should be made more discriminating.
2240 #
2241 # bpo-31323: Store the exception as string to prevent
2242 # a reference leak: server -> conn_errors -> exception
2243 # -> traceback -> self (ConnectionHandler) -> server
2244 self.server.conn_errors.append(str(e))
2245 if self.server.chatty:
2246 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2247 self.running = False
2248 self.server.stop()
2249 self.close()
2250 return False
2251 else:
2252 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2253 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2254 cert = self.sslconn.getpeercert()
2255 if support.verbose and self.server.chatty:
2256 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2257 cert_binary = self.sslconn.getpeercert(True)
2258 if support.verbose and self.server.chatty:
2259 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2260 cipher = self.sslconn.cipher()
2261 if support.verbose and self.server.chatty:
2262 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2263 sys.stdout.write(" server: selected protocol is now "
2264 + str(self.sslconn.selected_npn_protocol()) + "\n")
2265 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002266
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002267 def read(self):
2268 if self.sslconn:
2269 return self.sslconn.read()
2270 else:
2271 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002272
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002273 def write(self, bytes):
2274 if self.sslconn:
2275 return self.sslconn.write(bytes)
2276 else:
2277 return self.sock.send(bytes)
2278
2279 def close(self):
2280 if self.sslconn:
2281 self.sslconn.close()
2282 else:
2283 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002284
Antoine Pitrou480a1242010-04-28 21:37:09 +00002285 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002286 self.running = True
2287 if not self.server.starttls_server:
2288 if not self.wrap_conn():
2289 return
2290 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002291 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002292 msg = self.read()
2293 stripped = msg.strip()
2294 if not stripped:
2295 # eof, so quit this handler
2296 self.running = False
2297 try:
2298 self.sock = self.sslconn.unwrap()
2299 except OSError:
2300 # Many tests shut the TCP connection down
2301 # without an SSL shutdown. This causes
2302 # unwrap() to raise OSError with errno=0!
2303 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002304 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002305 self.sslconn = None
2306 self.close()
2307 elif stripped == b'over':
2308 if support.verbose and self.server.connectionchatty:
2309 sys.stdout.write(" server: client closed connection\n")
2310 self.close()
2311 return
2312 elif (self.server.starttls_server and
2313 stripped == b'STARTTLS'):
2314 if support.verbose and self.server.connectionchatty:
2315 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2316 self.write(b"OK\n")
2317 if not self.wrap_conn():
2318 return
2319 elif (self.server.starttls_server and self.sslconn
2320 and stripped == b'ENDTLS'):
2321 if support.verbose and self.server.connectionchatty:
2322 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2323 self.write(b"OK\n")
2324 self.sock = self.sslconn.unwrap()
2325 self.sslconn = None
2326 if support.verbose and self.server.connectionchatty:
2327 sys.stdout.write(" server: connection is now unencrypted...\n")
2328 elif stripped == b'CB tls-unique':
2329 if support.verbose and self.server.connectionchatty:
2330 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2331 data = self.sslconn.get_channel_binding("tls-unique")
2332 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002333 elif stripped == b'PHA':
2334 if support.verbose and self.server.connectionchatty:
2335 sys.stdout.write(" server: initiating post handshake auth\n")
2336 try:
2337 self.sslconn.verify_client_post_handshake()
2338 except ssl.SSLError as e:
2339 self.write(repr(e).encode("us-ascii") + b"\n")
2340 else:
2341 self.write(b"OK\n")
2342 elif stripped == b'HASCERT':
2343 if self.sslconn.getpeercert() is not None:
2344 self.write(b'TRUE\n')
2345 else:
2346 self.write(b'FALSE\n')
2347 elif stripped == b'GETCERT':
2348 cert = self.sslconn.getpeercert()
2349 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002350 else:
2351 if (support.verbose and
2352 self.server.connectionchatty):
2353 ctype = (self.sslconn and "encrypted") or "unencrypted"
2354 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2355 % (msg, ctype, msg.lower(), ctype))
2356 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002357 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002358 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2359 # when connection is not shut down gracefully.
2360 if self.server.chatty and support.verbose:
2361 sys.stdout.write(
2362 " Connection reset by peer: {}\n".format(
2363 self.addr)
2364 )
2365 self.close()
2366 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002367 except ssl.SSLError as err:
2368 # On Windows sometimes test_pha_required_nocert receives the
2369 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2370 # before the 'tlsv13 alert certificate required' exception.
2371 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2372 # is received test_pha_required_nocert fails with ConnectionResetError
2373 # because the underlying socket is closed
2374 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2375 if self.server.chatty and support.verbose:
2376 sys.stdout.write(err.args[1])
2377 # test_pha_required_nocert is expecting this exception
2378 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002379 except OSError:
2380 if self.server.chatty:
2381 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002382 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002383 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002384
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002385 # normally, we'd just stop here, but for the test
2386 # harness, we want to stop the server
2387 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002388
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002389 def __init__(self, certificate=None, ssl_version=None,
2390 certreqs=None, cacerts=None,
2391 chatty=True, connectionchatty=False, starttls_server=False,
2392 npn_protocols=None, alpn_protocols=None,
2393 ciphers=None, context=None):
2394 if context:
2395 self.context = context
2396 else:
2397 self.context = ssl.SSLContext(ssl_version
2398 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002399 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002400 self.context.verify_mode = (certreqs if certreqs is not None
2401 else ssl.CERT_NONE)
2402 if cacerts:
2403 self.context.load_verify_locations(cacerts)
2404 if certificate:
2405 self.context.load_cert_chain(certificate)
2406 if npn_protocols:
2407 self.context.set_npn_protocols(npn_protocols)
2408 if alpn_protocols:
2409 self.context.set_alpn_protocols(alpn_protocols)
2410 if ciphers:
2411 self.context.set_ciphers(ciphers)
2412 self.chatty = chatty
2413 self.connectionchatty = connectionchatty
2414 self.starttls_server = starttls_server
2415 self.sock = socket.socket()
2416 self.port = support.bind_port(self.sock)
2417 self.flag = None
2418 self.active = False
2419 self.selected_npn_protocols = []
2420 self.selected_alpn_protocols = []
2421 self.shared_ciphers = []
2422 self.conn_errors = []
2423 threading.Thread.__init__(self)
2424 self.daemon = True
2425
2426 def __enter__(self):
2427 self.start(threading.Event())
2428 self.flag.wait()
2429 return self
2430
2431 def __exit__(self, *args):
2432 self.stop()
2433 self.join()
2434
2435 def start(self, flag=None):
2436 self.flag = flag
2437 threading.Thread.start(self)
2438
2439 def run(self):
2440 self.sock.settimeout(0.05)
2441 self.sock.listen()
2442 self.active = True
2443 if self.flag:
2444 # signal an event
2445 self.flag.set()
2446 while self.active:
2447 try:
2448 newconn, connaddr = self.sock.accept()
2449 if support.verbose and self.chatty:
2450 sys.stdout.write(' server: new connection from '
2451 + repr(connaddr) + '\n')
2452 handler = self.ConnectionHandler(self, newconn, connaddr)
2453 handler.start()
2454 handler.join()
2455 except socket.timeout:
2456 pass
2457 except KeyboardInterrupt:
2458 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002459 except BaseException as e:
2460 if support.verbose and self.chatty:
2461 sys.stdout.write(
2462 ' connection handling failed: ' + repr(e) + '\n')
2463
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002464 self.sock.close()
2465
2466 def stop(self):
2467 self.active = False
2468
2469class AsyncoreEchoServer(threading.Thread):
2470
2471 # this one's based on asyncore.dispatcher
2472
2473 class EchoServer (asyncore.dispatcher):
2474
2475 class ConnectionHandler(asyncore.dispatcher_with_send):
2476
2477 def __init__(self, conn, certfile):
2478 self.socket = test_wrap_socket(conn, server_side=True,
2479 certfile=certfile,
2480 do_handshake_on_connect=False)
2481 asyncore.dispatcher_with_send.__init__(self, self.socket)
2482 self._ssl_accepting = True
2483 self._do_ssl_handshake()
2484
2485 def readable(self):
2486 if isinstance(self.socket, ssl.SSLSocket):
2487 while self.socket.pending() > 0:
2488 self.handle_read_event()
2489 return True
2490
2491 def _do_ssl_handshake(self):
2492 try:
2493 self.socket.do_handshake()
2494 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2495 return
2496 except ssl.SSLEOFError:
2497 return self.handle_close()
2498 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002499 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002500 except OSError as err:
2501 if err.args[0] == errno.ECONNABORTED:
2502 return self.handle_close()
2503 else:
2504 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002505
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002506 def handle_read(self):
2507 if self._ssl_accepting:
2508 self._do_ssl_handshake()
2509 else:
2510 data = self.recv(1024)
2511 if support.verbose:
2512 sys.stdout.write(" server: read %s from client\n" % repr(data))
2513 if not data:
2514 self.close()
2515 else:
2516 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002517
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002518 def handle_close(self):
2519 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002520 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002521 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002522
2523 def handle_error(self):
2524 raise
2525
Trent Nelson78520002008-04-10 20:54:35 +00002526 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 self.certfile = certfile
2528 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2529 self.port = support.bind_port(sock, '')
2530 asyncore.dispatcher.__init__(self, sock)
2531 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002532
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002533 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002534 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002535 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2536 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002537
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002538 def handle_error(self):
2539 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002540
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002541 def __init__(self, certfile):
2542 self.flag = None
2543 self.active = False
2544 self.server = self.EchoServer(certfile)
2545 self.port = self.server.port
2546 threading.Thread.__init__(self)
2547 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002548
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002549 def __str__(self):
2550 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002551
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002552 def __enter__(self):
2553 self.start(threading.Event())
2554 self.flag.wait()
2555 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002556
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002558 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002559 sys.stdout.write(" cleanup: stopping server.\n")
2560 self.stop()
2561 if support.verbose:
2562 sys.stdout.write(" cleanup: joining server thread.\n")
2563 self.join()
2564 if support.verbose:
2565 sys.stdout.write(" cleanup: successfully joined.\n")
2566 # make sure that ConnectionHandler is removed from socket_map
2567 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002568
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002569 def start (self, flag=None):
2570 self.flag = flag
2571 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002572
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002573 def run(self):
2574 self.active = True
2575 if self.flag:
2576 self.flag.set()
2577 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002578 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002579 asyncore.loop(1)
2580 except:
2581 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002582
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002583 def stop(self):
2584 self.active = False
2585 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002586
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002587def server_params_test(client_context, server_context, indata=b"FOO\n",
2588 chatty=True, connectionchatty=False, sni_name=None,
2589 session=None):
2590 """
2591 Launch a server, connect a client to it and try various reads
2592 and writes.
2593 """
2594 stats = {}
2595 server = ThreadedEchoServer(context=server_context,
2596 chatty=chatty,
2597 connectionchatty=False)
2598 with server:
2599 with client_context.wrap_socket(socket.socket(),
2600 server_hostname=sni_name, session=session) as s:
2601 s.connect((HOST, server.port))
2602 for arg in [indata, bytearray(indata), memoryview(indata)]:
2603 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002604 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002605 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002606 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002607 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002608 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002609 if connectionchatty:
2610 if support.verbose:
2611 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002612 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002613 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002614 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2615 % (outdata[:20], len(outdata),
2616 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002617 s.write(b"over\n")
2618 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002619 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002620 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002621 stats.update({
2622 'compression': s.compression(),
2623 'cipher': s.cipher(),
2624 'peercert': s.getpeercert(),
2625 'client_alpn_protocol': s.selected_alpn_protocol(),
2626 'client_npn_protocol': s.selected_npn_protocol(),
2627 'version': s.version(),
2628 'session_reused': s.session_reused,
2629 'session': s.session,
2630 })
2631 s.close()
2632 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2633 stats['server_npn_protocols'] = server.selected_npn_protocols
2634 stats['server_shared_ciphers'] = server.shared_ciphers
2635 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002636
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002637def try_protocol_combo(server_protocol, client_protocol, expect_success,
2638 certsreqs=None, server_options=0, client_options=0):
2639 """
2640 Try to SSL-connect using *client_protocol* to *server_protocol*.
2641 If *expect_success* is true, assert that the connection succeeds,
2642 if it's false, assert that the connection fails.
2643 Also, if *expect_success* is a string, assert that it is the protocol
2644 version actually used by the connection.
2645 """
2646 if certsreqs is None:
2647 certsreqs = ssl.CERT_NONE
2648 certtype = {
2649 ssl.CERT_NONE: "CERT_NONE",
2650 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2651 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2652 }[certsreqs]
2653 if support.verbose:
2654 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2655 sys.stdout.write(formatstr %
2656 (ssl.get_protocol_name(client_protocol),
2657 ssl.get_protocol_name(server_protocol),
2658 certtype))
2659 client_context = ssl.SSLContext(client_protocol)
2660 client_context.options |= client_options
2661 server_context = ssl.SSLContext(server_protocol)
2662 server_context.options |= server_options
2663
Victor Stinner3ef63442019-02-19 18:06:03 +01002664 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2665 if (min_version is not None
2666 # SSLContext.minimum_version is only available on recent OpenSSL
2667 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2668 and hasattr(server_context, 'minimum_version')
2669 and server_protocol == ssl.PROTOCOL_TLS
2670 and server_context.minimum_version > min_version):
2671 # If OpenSSL configuration is strict and requires more recent TLS
2672 # version, we have to change the minimum to test old TLS versions.
2673 server_context.minimum_version = min_version
2674
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2676 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2677 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002678 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002679 client_context.set_ciphers("ALL")
2680
2681 for ctx in (client_context, server_context):
2682 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002683 ctx.load_cert_chain(SIGNED_CERTFILE)
2684 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002685 try:
2686 stats = server_params_test(client_context, server_context,
2687 chatty=False, connectionchatty=False)
2688 # Protocol mismatch can result in either an SSLError, or a
2689 # "Connection reset by peer" error.
2690 except ssl.SSLError:
2691 if expect_success:
2692 raise
2693 except OSError as e:
2694 if expect_success or e.errno != errno.ECONNRESET:
2695 raise
2696 else:
2697 if not expect_success:
2698 raise AssertionError(
2699 "Client protocol %s succeeded with server protocol %s!"
2700 % (ssl.get_protocol_name(client_protocol),
2701 ssl.get_protocol_name(server_protocol)))
2702 elif (expect_success is not True
2703 and expect_success != stats['version']):
2704 raise AssertionError("version mismatch: expected %r, got %r"
2705 % (expect_success, stats['version']))
2706
2707
2708class ThreadedTests(unittest.TestCase):
2709
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002710 def test_echo(self):
2711 """Basic test of an SSL client connecting to a server"""
2712 if support.verbose:
2713 sys.stdout.write("\n")
2714 for protocol in PROTOCOLS:
2715 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2716 continue
2717 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2718 context = ssl.SSLContext(protocol)
2719 context.load_cert_chain(CERTFILE)
2720 server_params_test(context, context,
2721 chatty=True, connectionchatty=True)
2722
Christian Heimesa170fa12017-09-15 20:27:30 +02002723 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002724
2725 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2726 server_params_test(client_context=client_context,
2727 server_context=server_context,
2728 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002729 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002730
2731 client_context.check_hostname = False
2732 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2733 with self.assertRaises(ssl.SSLError) as e:
2734 server_params_test(client_context=server_context,
2735 server_context=client_context,
2736 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002737 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002738 self.assertIn('called a function you should not call',
2739 str(e.exception))
2740
2741 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2742 with self.assertRaises(ssl.SSLError) as e:
2743 server_params_test(client_context=server_context,
2744 server_context=server_context,
2745 chatty=True, connectionchatty=True)
2746 self.assertIn('called a function you should not call',
2747 str(e.exception))
2748
2749 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2750 with self.assertRaises(ssl.SSLError) as e:
2751 server_params_test(client_context=server_context,
2752 server_context=client_context,
2753 chatty=True, connectionchatty=True)
2754 self.assertIn('called a function you should not call',
2755 str(e.exception))
2756
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002757 def test_getpeercert(self):
2758 if support.verbose:
2759 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002760
2761 client_context, server_context, hostname = testing_context()
2762 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002763 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002764 with client_context.wrap_socket(socket.socket(),
2765 do_handshake_on_connect=False,
2766 server_hostname=hostname) as s:
2767 s.connect((HOST, server.port))
2768 # getpeercert() raise ValueError while the handshake isn't
2769 # done.
2770 with self.assertRaises(ValueError):
2771 s.getpeercert()
2772 s.do_handshake()
2773 cert = s.getpeercert()
2774 self.assertTrue(cert, "Can't get peer certificate.")
2775 cipher = s.cipher()
2776 if support.verbose:
2777 sys.stdout.write(pprint.pformat(cert) + '\n')
2778 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2779 if 'subject' not in cert:
2780 self.fail("No subject field in certificate: %s." %
2781 pprint.pformat(cert))
2782 if ((('organizationName', 'Python Software Foundation'),)
2783 not in cert['subject']):
2784 self.fail(
2785 "Missing or invalid 'organizationName' field in certificate subject; "
2786 "should be 'Python Software Foundation'.")
2787 self.assertIn('notBefore', cert)
2788 self.assertIn('notAfter', cert)
2789 before = ssl.cert_time_to_seconds(cert['notBefore'])
2790 after = ssl.cert_time_to_seconds(cert['notAfter'])
2791 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002792
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002793 @unittest.skipUnless(have_verify_flags(),
2794 "verify_flags need OpenSSL > 0.9.8")
2795 def test_crl_check(self):
2796 if support.verbose:
2797 sys.stdout.write("\n")
2798
Christian Heimesa170fa12017-09-15 20:27:30 +02002799 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002800
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002801 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002802 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002803
2804 # VERIFY_DEFAULT should pass
2805 server = ThreadedEchoServer(context=server_context, chatty=True)
2806 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002807 with client_context.wrap_socket(socket.socket(),
2808 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002809 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002810 cert = s.getpeercert()
2811 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002812
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002813 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002814 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002815
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002816 server = ThreadedEchoServer(context=server_context, chatty=True)
2817 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002818 with client_context.wrap_socket(socket.socket(),
2819 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002820 with self.assertRaisesRegex(ssl.SSLError,
2821 "certificate verify failed"):
2822 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002823
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002824 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002825 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002826
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002827 server = ThreadedEchoServer(context=server_context, chatty=True)
2828 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002829 with client_context.wrap_socket(socket.socket(),
2830 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002831 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002832 cert = s.getpeercert()
2833 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002834
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 def test_check_hostname(self):
2836 if support.verbose:
2837 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002838
Christian Heimesa170fa12017-09-15 20:27:30 +02002839 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002840
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002841 # correct hostname should verify
2842 server = ThreadedEchoServer(context=server_context, chatty=True)
2843 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002844 with client_context.wrap_socket(socket.socket(),
2845 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002846 s.connect((HOST, server.port))
2847 cert = s.getpeercert()
2848 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002849
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002850 # incorrect hostname should raise an exception
2851 server = ThreadedEchoServer(context=server_context, chatty=True)
2852 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002853 with client_context.wrap_socket(socket.socket(),
2854 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002855 with self.assertRaisesRegex(
2856 ssl.CertificateError,
2857 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002858 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002859
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002860 # missing server_hostname arg should cause an exception, too
2861 server = ThreadedEchoServer(context=server_context, chatty=True)
2862 with server:
2863 with socket.socket() as s:
2864 with self.assertRaisesRegex(ValueError,
2865 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002866 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002867
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002868 def test_ecc_cert(self):
2869 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2870 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002871 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002872 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2873
2874 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2875 # load ECC cert
2876 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2877
2878 # correct hostname should verify
2879 server = ThreadedEchoServer(context=server_context, chatty=True)
2880 with server:
2881 with client_context.wrap_socket(socket.socket(),
2882 server_hostname=hostname) as s:
2883 s.connect((HOST, server.port))
2884 cert = s.getpeercert()
2885 self.assertTrue(cert, "Can't get peer certificate.")
2886 cipher = s.cipher()[0].split('-')
2887 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2888
2889 def test_dual_rsa_ecc(self):
2890 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2891 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002892 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2893 # algorithms.
2894 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002895 # only ECDSA certs
2896 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2897 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2898
2899 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2900 # load ECC and RSA key/cert pairs
2901 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2902 server_context.load_cert_chain(SIGNED_CERTFILE)
2903
2904 # correct hostname should verify
2905 server = ThreadedEchoServer(context=server_context, chatty=True)
2906 with server:
2907 with client_context.wrap_socket(socket.socket(),
2908 server_hostname=hostname) as s:
2909 s.connect((HOST, server.port))
2910 cert = s.getpeercert()
2911 self.assertTrue(cert, "Can't get peer certificate.")
2912 cipher = s.cipher()[0].split('-')
2913 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2914
Christian Heimes66e57422018-01-29 14:25:13 +01002915 def test_check_hostname_idn(self):
2916 if support.verbose:
2917 sys.stdout.write("\n")
2918
Christian Heimes11a14932018-02-24 02:35:08 +01002919 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002920 server_context.load_cert_chain(IDNSANSFILE)
2921
Christian Heimes11a14932018-02-24 02:35:08 +01002922 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002923 context.verify_mode = ssl.CERT_REQUIRED
2924 context.check_hostname = True
2925 context.load_verify_locations(SIGNING_CA)
2926
2927 # correct hostname should verify, when specified in several
2928 # different ways
2929 idn_hostnames = [
2930 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002931 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002932 ('xn--knig-5qa.idn.pythontest.net',
2933 'xn--knig-5qa.idn.pythontest.net'),
2934 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002935 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002936
2937 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002938 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002939 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2940 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2941 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002942 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2943
2944 # ('königsgäßchen.idna2008.pythontest.net',
2945 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2946 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2947 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2948 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2949 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2950
Christian Heimes66e57422018-01-29 14:25:13 +01002951 ]
2952 for server_hostname, expected_hostname in idn_hostnames:
2953 server = ThreadedEchoServer(context=server_context, chatty=True)
2954 with server:
2955 with context.wrap_socket(socket.socket(),
2956 server_hostname=server_hostname) as s:
2957 self.assertEqual(s.server_hostname, expected_hostname)
2958 s.connect((HOST, server.port))
2959 cert = s.getpeercert()
2960 self.assertEqual(s.server_hostname, expected_hostname)
2961 self.assertTrue(cert, "Can't get peer certificate.")
2962
Christian Heimes66e57422018-01-29 14:25:13 +01002963 # incorrect hostname should raise an exception
2964 server = ThreadedEchoServer(context=server_context, chatty=True)
2965 with server:
2966 with context.wrap_socket(socket.socket(),
2967 server_hostname="python.example.org") as s:
2968 with self.assertRaises(ssl.CertificateError):
2969 s.connect((HOST, server.port))
2970
Christian Heimes529525f2018-05-23 22:24:45 +02002971 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002972 """Connecting when the server rejects the client's certificate
2973
2974 Launch a server with CERT_REQUIRED, and check that trying to
2975 connect to it with a wrong client certificate fails.
2976 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002977 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002978 # load client cert that is not signed by trusted CA
2979 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002980 # require TLS client authentication
2981 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002982 # TLS 1.3 has different handshake
2983 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002984
2985 server = ThreadedEchoServer(
2986 context=server_context, chatty=True, connectionchatty=True,
2987 )
2988
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002989 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002990 client_context.wrap_socket(socket.socket(),
2991 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002992 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002993 # Expect either an SSL error about the server rejecting
2994 # the connection, or a low-level connection reset (which
2995 # sometimes happens on Windows)
2996 s.connect((HOST, server.port))
2997 except ssl.SSLError as e:
2998 if support.verbose:
2999 sys.stdout.write("\nSSLError is %r\n" % e)
3000 except OSError as e:
3001 if e.errno != errno.ECONNRESET:
3002 raise
3003 if support.verbose:
3004 sys.stdout.write("\nsocket.error is %r\n" % e)
3005 else:
3006 self.fail("Use of invalid cert should have failed!")
3007
Christian Heimes529525f2018-05-23 22:24:45 +02003008 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
3009 def test_wrong_cert_tls13(self):
3010 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003011 # load client cert that is not signed by trusted CA
3012 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003013 server_context.verify_mode = ssl.CERT_REQUIRED
3014 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3015 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3016
3017 server = ThreadedEchoServer(
3018 context=server_context, chatty=True, connectionchatty=True,
3019 )
3020 with server, \
3021 client_context.wrap_socket(socket.socket(),
3022 server_hostname=hostname) as s:
3023 # TLS 1.3 perform client cert exchange after handshake
3024 s.connect((HOST, server.port))
3025 try:
3026 s.write(b'data')
3027 s.read(4)
3028 except ssl.SSLError as e:
3029 if support.verbose:
3030 sys.stdout.write("\nSSLError is %r\n" % e)
3031 except OSError as e:
3032 if e.errno != errno.ECONNRESET:
3033 raise
3034 if support.verbose:
3035 sys.stdout.write("\nsocket.error is %r\n" % e)
3036 else:
3037 self.fail("Use of invalid cert should have failed!")
3038
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003039 def test_rude_shutdown(self):
3040 """A brutal shutdown of an SSL server should raise an OSError
3041 in the client when attempting handshake.
3042 """
3043 listener_ready = threading.Event()
3044 listener_gone = threading.Event()
3045
3046 s = socket.socket()
3047 port = support.bind_port(s, HOST)
3048
3049 # `listener` runs in a thread. It sits in an accept() until
3050 # the main thread connects. Then it rudely closes the socket,
3051 # and sets Event `listener_gone` to let the main thread know
3052 # the socket is gone.
3053 def listener():
3054 s.listen()
3055 listener_ready.set()
3056 newsock, addr = s.accept()
3057 newsock.close()
3058 s.close()
3059 listener_gone.set()
3060
3061 def connector():
3062 listener_ready.wait()
3063 with socket.socket() as c:
3064 c.connect((HOST, port))
3065 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003066 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003067 ssl_sock = test_wrap_socket(c)
3068 except OSError:
3069 pass
3070 else:
3071 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003072
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003073 t = threading.Thread(target=listener)
3074 t.start()
3075 try:
3076 connector()
3077 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003078 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003079
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003080 def test_ssl_cert_verify_error(self):
3081 if support.verbose:
3082 sys.stdout.write("\n")
3083
3084 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3085 server_context.load_cert_chain(SIGNED_CERTFILE)
3086
3087 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3088
3089 server = ThreadedEchoServer(context=server_context, chatty=True)
3090 with server:
3091 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003092 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003093 try:
3094 s.connect((HOST, server.port))
3095 except ssl.SSLError as e:
3096 msg = 'unable to get local issuer certificate'
3097 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3098 self.assertEqual(e.verify_code, 20)
3099 self.assertEqual(e.verify_message, msg)
3100 self.assertIn(msg, repr(e))
3101 self.assertIn('certificate verify failed', repr(e))
3102
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003103 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3104 "OpenSSL is compiled without SSLv2 support")
3105 def test_protocol_sslv2(self):
3106 """Connecting to an SSLv2 server with various client options"""
3107 if support.verbose:
3108 sys.stdout.write("\n")
3109 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3110 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3111 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003112 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003113 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3114 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3115 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3116 # SSLv23 client with specific SSL options
3117 if no_sslv2_implies_sslv3_hello():
3118 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003119 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003120 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003121 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003122 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003123 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003124 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003125
Christian Heimesa170fa12017-09-15 20:27:30 +02003126 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003127 """Connecting to an SSLv23 server with various client options"""
3128 if support.verbose:
3129 sys.stdout.write("\n")
3130 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003131 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003132 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003133 except OSError as x:
3134 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3135 if support.verbose:
3136 sys.stdout.write(
3137 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3138 % str(x))
3139 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003140 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3141 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3142 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003143
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003144 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003145 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3146 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3147 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003148
3149 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003150 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3151 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3152 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003153
3154 # Server with specific SSL options
3155 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003156 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003157 server_options=ssl.OP_NO_SSLv3)
3158 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003159 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003160 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003161 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003162 server_options=ssl.OP_NO_TLSv1)
3163
3164
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003165 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3166 "OpenSSL is compiled without SSLv3 support")
3167 def test_protocol_sslv3(self):
3168 """Connecting to an SSLv3 server with various client options"""
3169 if support.verbose:
3170 sys.stdout.write("\n")
3171 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3172 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3173 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3174 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3175 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003176 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003177 client_options=ssl.OP_NO_SSLv3)
3178 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3179 if no_sslv2_implies_sslv3_hello():
3180 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003181 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003182 False, client_options=ssl.OP_NO_SSLv2)
3183
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003184 def test_protocol_tlsv1(self):
3185 """Connecting to a TLSv1 server with various client options"""
3186 if support.verbose:
3187 sys.stdout.write("\n")
3188 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3189 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3190 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3191 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3192 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3193 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3194 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003195 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003196 client_options=ssl.OP_NO_TLSv1)
3197
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003198 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3199 "TLS version 1.1 not supported.")
3200 def test_protocol_tlsv1_1(self):
3201 """Connecting to a TLSv1.1 server with various client options.
3202 Testing against older TLS versions."""
3203 if support.verbose:
3204 sys.stdout.write("\n")
3205 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3206 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3207 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3208 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3209 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003210 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003211 client_options=ssl.OP_NO_TLSv1_1)
3212
Christian Heimesa170fa12017-09-15 20:27:30 +02003213 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003214 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3215 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3216
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003217 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3218 "TLS version 1.2 not supported.")
3219 def test_protocol_tlsv1_2(self):
3220 """Connecting to a TLSv1.2 server with various client options.
3221 Testing against older TLS versions."""
3222 if support.verbose:
3223 sys.stdout.write("\n")
3224 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3225 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3226 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3227 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3228 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3229 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3230 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003231 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003232 client_options=ssl.OP_NO_TLSv1_2)
3233
Christian Heimesa170fa12017-09-15 20:27:30 +02003234 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003235 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3236 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3237 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3238 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3239
3240 def test_starttls(self):
3241 """Switching from clear text to encrypted and back again."""
3242 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3243
3244 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003245 starttls_server=True,
3246 chatty=True,
3247 connectionchatty=True)
3248 wrapped = False
3249 with server:
3250 s = socket.socket()
3251 s.setblocking(1)
3252 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003253 if support.verbose:
3254 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003255 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003256 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003257 sys.stdout.write(
3258 " client: sending %r...\n" % indata)
3259 if wrapped:
3260 conn.write(indata)
3261 outdata = conn.read()
3262 else:
3263 s.send(indata)
3264 outdata = s.recv(1024)
3265 msg = outdata.strip().lower()
3266 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3267 # STARTTLS ok, switch to secure mode
3268 if support.verbose:
3269 sys.stdout.write(
3270 " client: read %r from server, starting TLS...\n"
3271 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003272 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003273 wrapped = True
3274 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3275 # ENDTLS ok, switch back to clear text
3276 if support.verbose:
3277 sys.stdout.write(
3278 " client: read %r from server, ending TLS...\n"
3279 % msg)
3280 s = conn.unwrap()
3281 wrapped = False
3282 else:
3283 if support.verbose:
3284 sys.stdout.write(
3285 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003286 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003287 sys.stdout.write(" client: closing connection.\n")
3288 if wrapped:
3289 conn.write(b"over\n")
3290 else:
3291 s.send(b"over\n")
3292 if wrapped:
3293 conn.close()
3294 else:
3295 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003296
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003297 def test_socketserver(self):
3298 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003299 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003300 # try to connect
3301 if support.verbose:
3302 sys.stdout.write('\n')
3303 with open(CERTFILE, 'rb') as f:
3304 d1 = f.read()
3305 d2 = ''
3306 # now fetch the same data from the HTTPS server
3307 url = 'https://localhost:%d/%s' % (
3308 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003309 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003310 f = urllib.request.urlopen(url, context=context)
3311 try:
3312 dlen = f.info().get("content-length")
3313 if dlen and (int(dlen) > 0):
3314 d2 = f.read(int(dlen))
3315 if support.verbose:
3316 sys.stdout.write(
3317 " client: read %d bytes from remote server '%s'\n"
3318 % (len(d2), server))
3319 finally:
3320 f.close()
3321 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003322
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003323 def test_asyncore_server(self):
3324 """Check the example asyncore integration."""
3325 if support.verbose:
3326 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003327
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003328 indata = b"FOO\n"
3329 server = AsyncoreEchoServer(CERTFILE)
3330 with server:
3331 s = test_wrap_socket(socket.socket())
3332 s.connect(('127.0.0.1', server.port))
3333 if support.verbose:
3334 sys.stdout.write(
3335 " client: sending %r...\n" % indata)
3336 s.write(indata)
3337 outdata = s.read()
3338 if support.verbose:
3339 sys.stdout.write(" client: read %r\n" % outdata)
3340 if outdata != indata.lower():
3341 self.fail(
3342 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3343 % (outdata[:20], len(outdata),
3344 indata[:20].lower(), len(indata)))
3345 s.write(b"over\n")
3346 if support.verbose:
3347 sys.stdout.write(" client: closing connection.\n")
3348 s.close()
3349 if support.verbose:
3350 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003351
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003352 def test_recv_send(self):
3353 """Test recv(), send() and friends."""
3354 if support.verbose:
3355 sys.stdout.write("\n")
3356
3357 server = ThreadedEchoServer(CERTFILE,
3358 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003359 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003360 cacerts=CERTFILE,
3361 chatty=True,
3362 connectionchatty=False)
3363 with server:
3364 s = test_wrap_socket(socket.socket(),
3365 server_side=False,
3366 certfile=CERTFILE,
3367 ca_certs=CERTFILE,
3368 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003369 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003370 s.connect((HOST, server.port))
3371 # helper methods for standardising recv* method signatures
3372 def _recv_into():
3373 b = bytearray(b"\0"*100)
3374 count = s.recv_into(b)
3375 return b[:count]
3376
3377 def _recvfrom_into():
3378 b = bytearray(b"\0"*100)
3379 count, addr = s.recvfrom_into(b)
3380 return b[:count]
3381
3382 # (name, method, expect success?, *args, return value func)
3383 send_methods = [
3384 ('send', s.send, True, [], len),
3385 ('sendto', s.sendto, False, ["some.address"], len),
3386 ('sendall', s.sendall, True, [], lambda x: None),
3387 ]
3388 # (name, method, whether to expect success, *args)
3389 recv_methods = [
3390 ('recv', s.recv, True, []),
3391 ('recvfrom', s.recvfrom, False, ["some.address"]),
3392 ('recv_into', _recv_into, True, []),
3393 ('recvfrom_into', _recvfrom_into, False, []),
3394 ]
3395 data_prefix = "PREFIX_"
3396
3397 for (meth_name, send_meth, expect_success, args,
3398 ret_val_meth) in send_methods:
3399 indata = (data_prefix + meth_name).encode('ascii')
3400 try:
3401 ret = send_meth(indata, *args)
3402 msg = "sending with {}".format(meth_name)
3403 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3404 outdata = s.read()
3405 if outdata != indata.lower():
3406 self.fail(
3407 "While sending with <<{name:s}>> bad data "
3408 "<<{outdata:r}>> ({nout:d}) received; "
3409 "expected <<{indata:r}>> ({nin:d})\n".format(
3410 name=meth_name, outdata=outdata[:20],
3411 nout=len(outdata),
3412 indata=indata[:20], nin=len(indata)
3413 )
3414 )
3415 except ValueError as e:
3416 if expect_success:
3417 self.fail(
3418 "Failed to send with method <<{name:s}>>; "
3419 "expected to succeed.\n".format(name=meth_name)
3420 )
3421 if not str(e).startswith(meth_name):
3422 self.fail(
3423 "Method <<{name:s}>> failed with unexpected "
3424 "exception message: {exp:s}\n".format(
3425 name=meth_name, exp=e
3426 )
3427 )
3428
3429 for meth_name, recv_meth, expect_success, args in recv_methods:
3430 indata = (data_prefix + meth_name).encode('ascii')
3431 try:
3432 s.send(indata)
3433 outdata = recv_meth(*args)
3434 if outdata != indata.lower():
3435 self.fail(
3436 "While receiving with <<{name:s}>> bad data "
3437 "<<{outdata:r}>> ({nout:d}) received; "
3438 "expected <<{indata:r}>> ({nin:d})\n".format(
3439 name=meth_name, outdata=outdata[:20],
3440 nout=len(outdata),
3441 indata=indata[:20], nin=len(indata)
3442 )
3443 )
3444 except ValueError as e:
3445 if expect_success:
3446 self.fail(
3447 "Failed to receive with method <<{name:s}>>; "
3448 "expected to succeed.\n".format(name=meth_name)
3449 )
3450 if not str(e).startswith(meth_name):
3451 self.fail(
3452 "Method <<{name:s}>> failed with unexpected "
3453 "exception message: {exp:s}\n".format(
3454 name=meth_name, exp=e
3455 )
3456 )
3457 # consume data
3458 s.read()
3459
3460 # read(-1, buffer) is supported, even though read(-1) is not
3461 data = b"data"
3462 s.send(data)
3463 buffer = bytearray(len(data))
3464 self.assertEqual(s.read(-1, buffer), len(data))
3465 self.assertEqual(buffer, data)
3466
Christian Heimes888bbdc2017-09-07 14:18:21 -07003467 # sendall accepts bytes-like objects
3468 if ctypes is not None:
3469 ubyte = ctypes.c_ubyte * len(data)
3470 byteslike = ubyte.from_buffer_copy(data)
3471 s.sendall(byteslike)
3472 self.assertEqual(s.read(), data)
3473
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003474 # Make sure sendmsg et al are disallowed to avoid
3475 # inadvertent disclosure of data and/or corruption
3476 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003477 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003478 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3479 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3480 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003481 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003482 s.write(b"over\n")
3483
3484 self.assertRaises(ValueError, s.recv, -1)
3485 self.assertRaises(ValueError, s.read, -1)
3486
3487 s.close()
3488
3489 def test_recv_zero(self):
3490 server = ThreadedEchoServer(CERTFILE)
3491 server.__enter__()
3492 self.addCleanup(server.__exit__, None, None)
3493 s = socket.create_connection((HOST, server.port))
3494 self.addCleanup(s.close)
3495 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3496 self.addCleanup(s.close)
3497
3498 # recv/read(0) should return no data
3499 s.send(b"data")
3500 self.assertEqual(s.recv(0), b"")
3501 self.assertEqual(s.read(0), b"")
3502 self.assertEqual(s.read(), b"data")
3503
3504 # Should not block if the other end sends no data
3505 s.setblocking(False)
3506 self.assertEqual(s.recv(0), b"")
3507 self.assertEqual(s.recv_into(bytearray()), 0)
3508
3509 def test_nonblocking_send(self):
3510 server = ThreadedEchoServer(CERTFILE,
3511 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003512 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003513 cacerts=CERTFILE,
3514 chatty=True,
3515 connectionchatty=False)
3516 with server:
3517 s = test_wrap_socket(socket.socket(),
3518 server_side=False,
3519 certfile=CERTFILE,
3520 ca_certs=CERTFILE,
3521 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003522 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003523 s.connect((HOST, server.port))
3524 s.setblocking(False)
3525
3526 # If we keep sending data, at some point the buffers
3527 # will be full and the call will block
3528 buf = bytearray(8192)
3529 def fill_buffer():
3530 while True:
3531 s.send(buf)
3532 self.assertRaises((ssl.SSLWantWriteError,
3533 ssl.SSLWantReadError), fill_buffer)
3534
3535 # Now read all the output and discard it
3536 s.setblocking(True)
3537 s.close()
3538
3539 def test_handshake_timeout(self):
3540 # Issue #5103: SSL handshake must respect the socket timeout
3541 server = socket.socket(socket.AF_INET)
3542 host = "127.0.0.1"
3543 port = support.bind_port(server)
3544 started = threading.Event()
3545 finish = False
3546
3547 def serve():
3548 server.listen()
3549 started.set()
3550 conns = []
3551 while not finish:
3552 r, w, e = select.select([server], [], [], 0.1)
3553 if server in r:
3554 # Let the socket hang around rather than having
3555 # it closed by garbage collection.
3556 conns.append(server.accept()[0])
3557 for sock in conns:
3558 sock.close()
3559
3560 t = threading.Thread(target=serve)
3561 t.start()
3562 started.wait()
3563
3564 try:
3565 try:
3566 c = socket.socket(socket.AF_INET)
3567 c.settimeout(0.2)
3568 c.connect((host, port))
3569 # Will attempt handshake and time out
3570 self.assertRaisesRegex(socket.timeout, "timed out",
3571 test_wrap_socket, c)
3572 finally:
3573 c.close()
3574 try:
3575 c = socket.socket(socket.AF_INET)
3576 c = test_wrap_socket(c)
3577 c.settimeout(0.2)
3578 # Will attempt handshake and time out
3579 self.assertRaisesRegex(socket.timeout, "timed out",
3580 c.connect, (host, port))
3581 finally:
3582 c.close()
3583 finally:
3584 finish = True
3585 t.join()
3586 server.close()
3587
3588 def test_server_accept(self):
3589 # Issue #16357: accept() on a SSLSocket created through
3590 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003591 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003592 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003593 context.load_verify_locations(SIGNING_CA)
3594 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003595 server = socket.socket(socket.AF_INET)
3596 host = "127.0.0.1"
3597 port = support.bind_port(server)
3598 server = context.wrap_socket(server, server_side=True)
3599 self.assertTrue(server.server_side)
3600
3601 evt = threading.Event()
3602 remote = None
3603 peer = None
3604 def serve():
3605 nonlocal remote, peer
3606 server.listen()
3607 # Block on the accept and wait on the connection to close.
3608 evt.set()
3609 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003610 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003611
3612 t = threading.Thread(target=serve)
3613 t.start()
3614 # Client wait until server setup and perform a connect.
3615 evt.wait()
3616 client = context.wrap_socket(socket.socket())
3617 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003618 client.send(b'data')
3619 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003620 client_addr = client.getsockname()
3621 client.close()
3622 t.join()
3623 remote.close()
3624 server.close()
3625 # Sanity checks.
3626 self.assertIsInstance(remote, ssl.SSLSocket)
3627 self.assertEqual(peer, client_addr)
3628
3629 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003630 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003631 with context.wrap_socket(socket.socket()) as sock:
3632 with self.assertRaises(OSError) as cm:
3633 sock.getpeercert()
3634 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3635
3636 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003637 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003638 with context.wrap_socket(socket.socket()) as sock:
3639 with self.assertRaises(OSError) as cm:
3640 sock.do_handshake()
3641 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3642
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003643 def test_no_shared_ciphers(self):
3644 client_context, server_context, hostname = testing_context()
3645 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3646 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003647 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003648 client_context.set_ciphers("AES128")
3649 server_context.set_ciphers("AES256")
3650 with ThreadedEchoServer(context=server_context) as server:
3651 with client_context.wrap_socket(socket.socket(),
3652 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003653 with self.assertRaises(OSError):
3654 s.connect((HOST, server.port))
3655 self.assertIn("no shared cipher", server.conn_errors[0])
3656
3657 def test_version_basic(self):
3658 """
3659 Basic tests for SSLSocket.version().
3660 More tests are done in the test_protocol_*() methods.
3661 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003662 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3663 context.check_hostname = False
3664 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003665 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003666 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003667 chatty=False) as server:
3668 with context.wrap_socket(socket.socket()) as s:
3669 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003670 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003671 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003672 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003673 self.assertEqual(s.version(), 'TLSv1.3')
3674 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003675 self.assertEqual(s.version(), 'TLSv1.2')
3676 else: # 0.9.8 to 1.0.1
3677 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003678 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003679 self.assertIs(s.version(), None)
3680
Christian Heimescb5b68a2017-09-07 18:07:00 -07003681 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3682 "test requires TLSv1.3 enabled OpenSSL")
3683 def test_tls1_3(self):
3684 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3685 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003686 context.options |= (
3687 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3688 )
3689 with ThreadedEchoServer(context=context) as server:
3690 with context.wrap_socket(socket.socket()) as s:
3691 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003692 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003693 'TLS_AES_256_GCM_SHA384',
3694 'TLS_CHACHA20_POLY1305_SHA256',
3695 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003696 })
3697 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003698
Christian Heimes698dde12018-02-27 11:54:43 +01003699 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3700 "required OpenSSL 1.1.0g")
3701 def test_min_max_version(self):
3702 client_context, server_context, hostname = testing_context()
3703 # client TLSv1.0 to 1.2
3704 client_context.minimum_version = ssl.TLSVersion.TLSv1
3705 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3706 # server only TLSv1.2
3707 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3708 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3709
3710 with ThreadedEchoServer(context=server_context) as server:
3711 with client_context.wrap_socket(socket.socket(),
3712 server_hostname=hostname) as s:
3713 s.connect((HOST, server.port))
3714 self.assertEqual(s.version(), 'TLSv1.2')
3715
3716 # client 1.0 to 1.2, server 1.0 to 1.1
3717 server_context.minimum_version = ssl.TLSVersion.TLSv1
3718 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3719
3720 with ThreadedEchoServer(context=server_context) as server:
3721 with client_context.wrap_socket(socket.socket(),
3722 server_hostname=hostname) as s:
3723 s.connect((HOST, server.port))
3724 self.assertEqual(s.version(), 'TLSv1.1')
3725
3726 # client 1.0, server 1.2 (mismatch)
3727 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3728 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003729 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003730 client_context.maximum_version = ssl.TLSVersion.TLSv1
3731 with ThreadedEchoServer(context=server_context) as server:
3732 with client_context.wrap_socket(socket.socket(),
3733 server_hostname=hostname) as s:
3734 with self.assertRaises(ssl.SSLError) as e:
3735 s.connect((HOST, server.port))
3736 self.assertIn("alert", str(e.exception))
3737
3738
3739 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3740 "required OpenSSL 1.1.0g")
3741 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3742 def test_min_max_version_sslv3(self):
3743 client_context, server_context, hostname = testing_context()
3744 server_context.minimum_version = ssl.TLSVersion.SSLv3
3745 client_context.minimum_version = ssl.TLSVersion.SSLv3
3746 client_context.maximum_version = ssl.TLSVersion.SSLv3
3747 with ThreadedEchoServer(context=server_context) as server:
3748 with client_context.wrap_socket(socket.socket(),
3749 server_hostname=hostname) as s:
3750 s.connect((HOST, server.port))
3751 self.assertEqual(s.version(), 'SSLv3')
3752
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003753 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3754 def test_default_ecdh_curve(self):
3755 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3756 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003757 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003758 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003759 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3760 # cipher name.
3761 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003762 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3763 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3764 # our default cipher list should prefer ECDH-based ciphers
3765 # automatically.
3766 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3767 context.set_ciphers("ECCdraft:ECDH")
3768 with ThreadedEchoServer(context=context) as server:
3769 with context.wrap_socket(socket.socket()) as s:
3770 s.connect((HOST, server.port))
3771 self.assertIn("ECDH", s.cipher()[0])
3772
3773 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3774 "'tls-unique' channel binding not available")
3775 def test_tls_unique_channel_binding(self):
3776 """Test tls-unique channel binding."""
3777 if support.verbose:
3778 sys.stdout.write("\n")
3779
Christian Heimes05d9fe32018-02-27 08:55:39 +01003780 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003781
3782 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003783 chatty=True,
3784 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003785
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003786 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003787 with client_context.wrap_socket(
3788 socket.socket(),
3789 server_hostname=hostname) as s:
3790 s.connect((HOST, server.port))
3791 # get the data
3792 cb_data = s.get_channel_binding("tls-unique")
3793 if support.verbose:
3794 sys.stdout.write(
3795 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003796
Christian Heimes05d9fe32018-02-27 08:55:39 +01003797 # check if it is sane
3798 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003799 if s.version() == 'TLSv1.3':
3800 self.assertEqual(len(cb_data), 48)
3801 else:
3802 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003803
Christian Heimes05d9fe32018-02-27 08:55:39 +01003804 # and compare with the peers version
3805 s.write(b"CB tls-unique\n")
3806 peer_data_repr = s.read().strip()
3807 self.assertEqual(peer_data_repr,
3808 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003809
3810 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003811 with client_context.wrap_socket(
3812 socket.socket(),
3813 server_hostname=hostname) as s:
3814 s.connect((HOST, server.port))
3815 new_cb_data = s.get_channel_binding("tls-unique")
3816 if support.verbose:
3817 sys.stdout.write(
3818 "got another channel binding data: {0!r}\n".format(
3819 new_cb_data)
3820 )
3821 # is it really unique
3822 self.assertNotEqual(cb_data, new_cb_data)
3823 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003824 if s.version() == 'TLSv1.3':
3825 self.assertEqual(len(cb_data), 48)
3826 else:
3827 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003828 s.write(b"CB tls-unique\n")
3829 peer_data_repr = s.read().strip()
3830 self.assertEqual(peer_data_repr,
3831 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003832
3833 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003834 client_context, server_context, hostname = testing_context()
3835 stats = server_params_test(client_context, server_context,
3836 chatty=True, connectionchatty=True,
3837 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003838 if support.verbose:
3839 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3840 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3841
3842 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3843 "ssl.OP_NO_COMPRESSION needed for this test")
3844 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003845 client_context, server_context, hostname = testing_context()
3846 client_context.options |= ssl.OP_NO_COMPRESSION
3847 server_context.options |= ssl.OP_NO_COMPRESSION
3848 stats = server_params_test(client_context, server_context,
3849 chatty=True, connectionchatty=True,
3850 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003851 self.assertIs(stats['compression'], None)
3852
Paul Monsonf3550692019-06-19 13:09:54 -07003853 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003854 def test_dh_params(self):
3855 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003856 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003857 # test scenario needs TLS <= 1.2
3858 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003859 server_context.load_dh_params(DHFILE)
3860 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003861 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003862 stats = server_params_test(client_context, server_context,
3863 chatty=True, connectionchatty=True,
3864 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003865 cipher = stats["cipher"][0]
3866 parts = cipher.split("-")
3867 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3868 self.fail("Non-DH cipher: " + cipher[0])
3869
Christian Heimesb7b92252018-02-25 09:49:31 +01003870 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003871 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003872 def test_ecdh_curve(self):
3873 # server secp384r1, client auto
3874 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003875
Christian Heimesb7b92252018-02-25 09:49:31 +01003876 server_context.set_ecdh_curve("secp384r1")
3877 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3878 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3879 stats = server_params_test(client_context, server_context,
3880 chatty=True, connectionchatty=True,
3881 sni_name=hostname)
3882
3883 # server auto, client secp384r1
3884 client_context, server_context, hostname = testing_context()
3885 client_context.set_ecdh_curve("secp384r1")
3886 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3887 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3888 stats = server_params_test(client_context, server_context,
3889 chatty=True, connectionchatty=True,
3890 sni_name=hostname)
3891
3892 # server / client curve mismatch
3893 client_context, server_context, hostname = testing_context()
3894 client_context.set_ecdh_curve("prime256v1")
3895 server_context.set_ecdh_curve("secp384r1")
3896 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3897 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3898 try:
3899 stats = server_params_test(client_context, server_context,
3900 chatty=True, connectionchatty=True,
3901 sni_name=hostname)
3902 except ssl.SSLError:
3903 pass
3904 else:
3905 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003906 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003907 self.fail("mismatch curve did not fail")
3908
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003909 def test_selected_alpn_protocol(self):
3910 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003911 client_context, server_context, hostname = testing_context()
3912 stats = server_params_test(client_context, server_context,
3913 chatty=True, connectionchatty=True,
3914 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003915 self.assertIs(stats['client_alpn_protocol'], None)
3916
3917 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3918 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3919 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003920 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003921 server_context.set_alpn_protocols(['foo', 'bar'])
3922 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003923 chatty=True, connectionchatty=True,
3924 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003925 self.assertIs(stats['client_alpn_protocol'], None)
3926
3927 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3928 def test_alpn_protocols(self):
3929 server_protocols = ['foo', 'bar', 'milkshake']
3930 protocol_tests = [
3931 (['foo', 'bar'], 'foo'),
3932 (['bar', 'foo'], 'foo'),
3933 (['milkshake'], 'milkshake'),
3934 (['http/3.0', 'http/4.0'], None)
3935 ]
3936 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003937 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003938 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939 client_context.set_alpn_protocols(client_protocols)
3940
3941 try:
3942 stats = server_params_test(client_context,
3943 server_context,
3944 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003945 connectionchatty=True,
3946 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003947 except ssl.SSLError as e:
3948 stats = e
3949
Christian Heimes05d9fe32018-02-27 08:55:39 +01003950 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003951 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3952 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3953 self.assertIsInstance(stats, ssl.SSLError)
3954 else:
3955 msg = "failed trying %s (s) and %s (c).\n" \
3956 "was expecting %s, but got %%s from the %%s" \
3957 % (str(server_protocols), str(client_protocols),
3958 str(expected))
3959 client_result = stats['client_alpn_protocol']
3960 self.assertEqual(client_result, expected,
3961 msg % (client_result, "client"))
3962 server_result = stats['server_alpn_protocols'][-1] \
3963 if len(stats['server_alpn_protocols']) else 'nothing'
3964 self.assertEqual(server_result, expected,
3965 msg % (server_result, "server"))
3966
3967 def test_selected_npn_protocol(self):
3968 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003969 client_context, server_context, hostname = testing_context()
3970 stats = server_params_test(client_context, server_context,
3971 chatty=True, connectionchatty=True,
3972 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003973 self.assertIs(stats['client_npn_protocol'], None)
3974
3975 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3976 def test_npn_protocols(self):
3977 server_protocols = ['http/1.1', 'spdy/2']
3978 protocol_tests = [
3979 (['http/1.1', 'spdy/2'], 'http/1.1'),
3980 (['spdy/2', 'http/1.1'], 'http/1.1'),
3981 (['spdy/2', 'test'], 'spdy/2'),
3982 (['abc', 'def'], 'abc')
3983 ]
3984 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003985 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003986 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003987 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003988 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003989 chatty=True, connectionchatty=True,
3990 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003991 msg = "failed trying %s (s) and %s (c).\n" \
3992 "was expecting %s, but got %%s from the %%s" \
3993 % (str(server_protocols), str(client_protocols),
3994 str(expected))
3995 client_result = stats['client_npn_protocol']
3996 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3997 server_result = stats['server_npn_protocols'][-1] \
3998 if len(stats['server_npn_protocols']) else 'nothing'
3999 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4000
4001 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004002 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004003 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004004 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004005 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004006 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004007 client_context.load_verify_locations(SIGNING_CA)
4008 return server_context, other_context, client_context
4009
4010 def check_common_name(self, stats, name):
4011 cert = stats['peercert']
4012 self.assertIn((('commonName', name),), cert['subject'])
4013
4014 @needs_sni
4015 def test_sni_callback(self):
4016 calls = []
4017 server_context, other_context, client_context = self.sni_contexts()
4018
Christian Heimesa170fa12017-09-15 20:27:30 +02004019 client_context.check_hostname = False
4020
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004021 def servername_cb(ssl_sock, server_name, initial_context):
4022 calls.append((server_name, initial_context))
4023 if server_name is not None:
4024 ssl_sock.context = other_context
4025 server_context.set_servername_callback(servername_cb)
4026
4027 stats = server_params_test(client_context, server_context,
4028 chatty=True,
4029 sni_name='supermessage')
4030 # The hostname was fetched properly, and the certificate was
4031 # changed for the connection.
4032 self.assertEqual(calls, [("supermessage", server_context)])
4033 # CERTFILE4 was selected
4034 self.check_common_name(stats, 'fakehostname')
4035
4036 calls = []
4037 # The callback is called with server_name=None
4038 stats = server_params_test(client_context, server_context,
4039 chatty=True,
4040 sni_name=None)
4041 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004042 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004043
4044 # Check disabling the callback
4045 calls = []
4046 server_context.set_servername_callback(None)
4047
4048 stats = server_params_test(client_context, server_context,
4049 chatty=True,
4050 sni_name='notfunny')
4051 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004052 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004053 self.assertEqual(calls, [])
4054
4055 @needs_sni
4056 def test_sni_callback_alert(self):
4057 # Returning a TLS alert is reflected to the connecting client
4058 server_context, other_context, client_context = self.sni_contexts()
4059
4060 def cb_returning_alert(ssl_sock, server_name, initial_context):
4061 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4062 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 with self.assertRaises(ssl.SSLError) as cm:
4064 stats = server_params_test(client_context, server_context,
4065 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004066 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004067 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004068
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004069 @needs_sni
4070 def test_sni_callback_raising(self):
4071 # Raising fails the connection with a TLS handshake failure alert.
4072 server_context, other_context, client_context = self.sni_contexts()
4073
4074 def cb_raising(ssl_sock, server_name, initial_context):
4075 1/0
4076 server_context.set_servername_callback(cb_raising)
4077
Victor Stinner00253502019-06-03 03:51:43 +02004078 with support.catch_unraisable_exception() as catch:
4079 with self.assertRaises(ssl.SSLError) as cm:
4080 stats = server_params_test(client_context, server_context,
4081 chatty=False,
4082 sni_name='supermessage')
4083
4084 self.assertEqual(cm.exception.reason,
4085 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4086 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004087
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004088 @needs_sni
4089 def test_sni_callback_wrong_return_type(self):
4090 # Returning the wrong return type terminates the TLS connection
4091 # with an internal error alert.
4092 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004093
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004094 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4095 return "foo"
4096 server_context.set_servername_callback(cb_wrong_return_type)
4097
Victor Stinner00253502019-06-03 03:51:43 +02004098 with support.catch_unraisable_exception() as catch:
4099 with self.assertRaises(ssl.SSLError) as cm:
4100 stats = server_params_test(client_context, server_context,
4101 chatty=False,
4102 sni_name='supermessage')
4103
4104
4105 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4106 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004107
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004108 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004109 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004110 client_context.set_ciphers("AES128:AES256")
4111 server_context.set_ciphers("AES256")
4112 expected_algs = [
4113 "AES256", "AES-256",
4114 # TLS 1.3 ciphers are always enabled
4115 "TLS_CHACHA20", "TLS_AES",
4116 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004117
Christian Heimesa170fa12017-09-15 20:27:30 +02004118 stats = server_params_test(client_context, server_context,
4119 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004120 ciphers = stats['server_shared_ciphers'][0]
4121 self.assertGreater(len(ciphers), 0)
4122 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004123 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004124 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004125
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004126 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004127 client_context, server_context, hostname = testing_context()
4128 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004129
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004130 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004131 s = client_context.wrap_socket(socket.socket(),
4132 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004133 s.connect((HOST, server.port))
4134 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004135
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004136 self.assertRaises(ValueError, s.read, 1024)
4137 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004138
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004139 def test_sendfile(self):
4140 TEST_DATA = b"x" * 512
4141 with open(support.TESTFN, 'wb') as f:
4142 f.write(TEST_DATA)
4143 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004144 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004145 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004146 context.load_verify_locations(SIGNING_CA)
4147 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004148 server = ThreadedEchoServer(context=context, chatty=False)
4149 with server:
4150 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004151 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004152 with open(support.TESTFN, 'rb') as file:
4153 s.sendfile(file)
4154 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004155
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004156 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004157 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004158 # TODO: sessions aren't compatible with TLSv1.3 yet
4159 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004160
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004161 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004162 stats = server_params_test(client_context, server_context,
4163 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004164 session = stats['session']
4165 self.assertTrue(session.id)
4166 self.assertGreater(session.time, 0)
4167 self.assertGreater(session.timeout, 0)
4168 self.assertTrue(session.has_ticket)
4169 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4170 self.assertGreater(session.ticket_lifetime_hint, 0)
4171 self.assertFalse(stats['session_reused'])
4172 sess_stat = server_context.session_stats()
4173 self.assertEqual(sess_stat['accept'], 1)
4174 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004175
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004176 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004177 stats = server_params_test(client_context, server_context,
4178 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004179 sess_stat = server_context.session_stats()
4180 self.assertEqual(sess_stat['accept'], 2)
4181 self.assertEqual(sess_stat['hits'], 1)
4182 self.assertTrue(stats['session_reused'])
4183 session2 = stats['session']
4184 self.assertEqual(session2.id, session.id)
4185 self.assertEqual(session2, session)
4186 self.assertIsNot(session2, session)
4187 self.assertGreaterEqual(session2.time, session.time)
4188 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004189
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004190 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004191 stats = server_params_test(client_context, server_context,
4192 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004193 self.assertFalse(stats['session_reused'])
4194 session3 = stats['session']
4195 self.assertNotEqual(session3.id, session.id)
4196 self.assertNotEqual(session3, session)
4197 sess_stat = server_context.session_stats()
4198 self.assertEqual(sess_stat['accept'], 3)
4199 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004200
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004201 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004202 stats = server_params_test(client_context, server_context,
4203 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004204 self.assertTrue(stats['session_reused'])
4205 session4 = stats['session']
4206 self.assertEqual(session4.id, session.id)
4207 self.assertEqual(session4, session)
4208 self.assertGreaterEqual(session4.time, session.time)
4209 self.assertGreaterEqual(session4.timeout, session.timeout)
4210 sess_stat = server_context.session_stats()
4211 self.assertEqual(sess_stat['accept'], 4)
4212 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004213
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004214 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004215 client_context, server_context, hostname = testing_context()
4216 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004217
Christian Heimes05d9fe32018-02-27 08:55:39 +01004218 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004219 client_context.options |= ssl.OP_NO_TLSv1_3
4220 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004221
Christian Heimesa170fa12017-09-15 20:27:30 +02004222 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004223 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004224 with client_context.wrap_socket(socket.socket(),
4225 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004226 # session is None before handshake
4227 self.assertEqual(s.session, None)
4228 self.assertEqual(s.session_reused, None)
4229 s.connect((HOST, server.port))
4230 session = s.session
4231 self.assertTrue(session)
4232 with self.assertRaises(TypeError) as e:
4233 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004234 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004235
Christian Heimesa170fa12017-09-15 20:27:30 +02004236 with client_context.wrap_socket(socket.socket(),
4237 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004238 s.connect((HOST, server.port))
4239 # cannot set session after handshake
4240 with self.assertRaises(ValueError) as e:
4241 s.session = session
4242 self.assertEqual(str(e.exception),
4243 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004244
Christian Heimesa170fa12017-09-15 20:27:30 +02004245 with client_context.wrap_socket(socket.socket(),
4246 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004247 # can set session before handshake and before the
4248 # connection was established
4249 s.session = session
4250 s.connect((HOST, server.port))
4251 self.assertEqual(s.session.id, session.id)
4252 self.assertEqual(s.session, session)
4253 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004254
Christian Heimesa170fa12017-09-15 20:27:30 +02004255 with client_context2.wrap_socket(socket.socket(),
4256 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004257 # cannot re-use session with a different SSLContext
4258 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004259 s.session = session
4260 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004261 self.assertEqual(str(e.exception),
4262 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004263
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004264
Christian Heimes9fb051f2018-09-23 08:32:31 +02004265@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4266class TestPostHandshakeAuth(unittest.TestCase):
4267 def test_pha_setter(self):
4268 protocols = [
4269 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4270 ]
4271 for protocol in protocols:
4272 ctx = ssl.SSLContext(protocol)
4273 self.assertEqual(ctx.post_handshake_auth, False)
4274
4275 ctx.post_handshake_auth = True
4276 self.assertEqual(ctx.post_handshake_auth, True)
4277
4278 ctx.verify_mode = ssl.CERT_REQUIRED
4279 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4280 self.assertEqual(ctx.post_handshake_auth, True)
4281
4282 ctx.post_handshake_auth = False
4283 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4284 self.assertEqual(ctx.post_handshake_auth, False)
4285
4286 ctx.verify_mode = ssl.CERT_OPTIONAL
4287 ctx.post_handshake_auth = True
4288 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4289 self.assertEqual(ctx.post_handshake_auth, True)
4290
4291 def test_pha_required(self):
4292 client_context, server_context, hostname = testing_context()
4293 server_context.post_handshake_auth = True
4294 server_context.verify_mode = ssl.CERT_REQUIRED
4295 client_context.post_handshake_auth = True
4296 client_context.load_cert_chain(SIGNED_CERTFILE)
4297
4298 server = ThreadedEchoServer(context=server_context, chatty=False)
4299 with server:
4300 with client_context.wrap_socket(socket.socket(),
4301 server_hostname=hostname) as s:
4302 s.connect((HOST, server.port))
4303 s.write(b'HASCERT')
4304 self.assertEqual(s.recv(1024), b'FALSE\n')
4305 s.write(b'PHA')
4306 self.assertEqual(s.recv(1024), b'OK\n')
4307 s.write(b'HASCERT')
4308 self.assertEqual(s.recv(1024), b'TRUE\n')
4309 # PHA method just returns true when cert is already available
4310 s.write(b'PHA')
4311 self.assertEqual(s.recv(1024), b'OK\n')
4312 s.write(b'GETCERT')
4313 cert_text = s.recv(4096).decode('us-ascii')
4314 self.assertIn('Python Software Foundation CA', cert_text)
4315
4316 def test_pha_required_nocert(self):
4317 client_context, server_context, hostname = testing_context()
4318 server_context.post_handshake_auth = True
4319 server_context.verify_mode = ssl.CERT_REQUIRED
4320 client_context.post_handshake_auth = True
4321
4322 server = ThreadedEchoServer(context=server_context, chatty=False)
4323 with server:
4324 with client_context.wrap_socket(socket.socket(),
4325 server_hostname=hostname) as s:
4326 s.connect((HOST, server.port))
4327 s.write(b'PHA')
4328 # receive CertificateRequest
4329 self.assertEqual(s.recv(1024), b'OK\n')
4330 # send empty Certificate + Finish
4331 s.write(b'HASCERT')
4332 # receive alert
4333 with self.assertRaisesRegex(
4334 ssl.SSLError,
4335 'tlsv13 alert certificate required'):
4336 s.recv(1024)
4337
4338 def test_pha_optional(self):
4339 if support.verbose:
4340 sys.stdout.write("\n")
4341
4342 client_context, server_context, hostname = testing_context()
4343 server_context.post_handshake_auth = True
4344 server_context.verify_mode = ssl.CERT_REQUIRED
4345 client_context.post_handshake_auth = True
4346 client_context.load_cert_chain(SIGNED_CERTFILE)
4347
4348 # check CERT_OPTIONAL
4349 server_context.verify_mode = ssl.CERT_OPTIONAL
4350 server = ThreadedEchoServer(context=server_context, chatty=False)
4351 with server:
4352 with client_context.wrap_socket(socket.socket(),
4353 server_hostname=hostname) as s:
4354 s.connect((HOST, server.port))
4355 s.write(b'HASCERT')
4356 self.assertEqual(s.recv(1024), b'FALSE\n')
4357 s.write(b'PHA')
4358 self.assertEqual(s.recv(1024), b'OK\n')
4359 s.write(b'HASCERT')
4360 self.assertEqual(s.recv(1024), b'TRUE\n')
4361
4362 def test_pha_optional_nocert(self):
4363 if support.verbose:
4364 sys.stdout.write("\n")
4365
4366 client_context, server_context, hostname = testing_context()
4367 server_context.post_handshake_auth = True
4368 server_context.verify_mode = ssl.CERT_OPTIONAL
4369 client_context.post_handshake_auth = True
4370
4371 server = ThreadedEchoServer(context=server_context, chatty=False)
4372 with server:
4373 with client_context.wrap_socket(socket.socket(),
4374 server_hostname=hostname) as s:
4375 s.connect((HOST, server.port))
4376 s.write(b'HASCERT')
4377 self.assertEqual(s.recv(1024), b'FALSE\n')
4378 s.write(b'PHA')
4379 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004380 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004381 s.write(b'HASCERT')
4382 self.assertEqual(s.recv(1024), b'FALSE\n')
4383
4384 def test_pha_no_pha_client(self):
4385 client_context, server_context, hostname = testing_context()
4386 server_context.post_handshake_auth = True
4387 server_context.verify_mode = ssl.CERT_REQUIRED
4388 client_context.load_cert_chain(SIGNED_CERTFILE)
4389
4390 server = ThreadedEchoServer(context=server_context, chatty=False)
4391 with server:
4392 with client_context.wrap_socket(socket.socket(),
4393 server_hostname=hostname) as s:
4394 s.connect((HOST, server.port))
4395 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4396 s.verify_client_post_handshake()
4397 s.write(b'PHA')
4398 self.assertIn(b'extension not received', s.recv(1024))
4399
4400 def test_pha_no_pha_server(self):
4401 # server doesn't have PHA enabled, cert is requested in handshake
4402 client_context, server_context, hostname = testing_context()
4403 server_context.verify_mode = ssl.CERT_REQUIRED
4404 client_context.post_handshake_auth = True
4405 client_context.load_cert_chain(SIGNED_CERTFILE)
4406
4407 server = ThreadedEchoServer(context=server_context, chatty=False)
4408 with server:
4409 with client_context.wrap_socket(socket.socket(),
4410 server_hostname=hostname) as s:
4411 s.connect((HOST, server.port))
4412 s.write(b'HASCERT')
4413 self.assertEqual(s.recv(1024), b'TRUE\n')
4414 # PHA doesn't fail if there is already a cert
4415 s.write(b'PHA')
4416 self.assertEqual(s.recv(1024), b'OK\n')
4417 s.write(b'HASCERT')
4418 self.assertEqual(s.recv(1024), b'TRUE\n')
4419
4420 def test_pha_not_tls13(self):
4421 # TLS 1.2
4422 client_context, server_context, hostname = testing_context()
4423 server_context.verify_mode = ssl.CERT_REQUIRED
4424 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4425 client_context.post_handshake_auth = True
4426 client_context.load_cert_chain(SIGNED_CERTFILE)
4427
4428 server = ThreadedEchoServer(context=server_context, chatty=False)
4429 with server:
4430 with client_context.wrap_socket(socket.socket(),
4431 server_hostname=hostname) as s:
4432 s.connect((HOST, server.port))
4433 # PHA fails for TLS != 1.3
4434 s.write(b'PHA')
4435 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4436
4437
Christian Heimesc7f70692019-05-31 11:44:05 +02004438HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4439requires_keylog = unittest.skipUnless(
4440 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4441
4442class TestSSLDebug(unittest.TestCase):
4443
4444 def keylog_lines(self, fname=support.TESTFN):
4445 with open(fname) as f:
4446 return len(list(f))
4447
4448 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004449 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004450 def test_keylog_defaults(self):
4451 self.addCleanup(support.unlink, support.TESTFN)
4452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4453 self.assertEqual(ctx.keylog_filename, None)
4454
4455 self.assertFalse(os.path.isfile(support.TESTFN))
4456 ctx.keylog_filename = support.TESTFN
4457 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4458 self.assertTrue(os.path.isfile(support.TESTFN))
4459 self.assertEqual(self.keylog_lines(), 1)
4460
4461 ctx.keylog_filename = None
4462 self.assertEqual(ctx.keylog_filename, None)
4463
4464 with self.assertRaises((IsADirectoryError, PermissionError)):
4465 # Windows raises PermissionError
4466 ctx.keylog_filename = os.path.dirname(
4467 os.path.abspath(support.TESTFN))
4468
4469 with self.assertRaises(TypeError):
4470 ctx.keylog_filename = 1
4471
4472 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004473 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004474 def test_keylog_filename(self):
4475 self.addCleanup(support.unlink, support.TESTFN)
4476 client_context, server_context, hostname = testing_context()
4477
4478 client_context.keylog_filename = support.TESTFN
4479 server = ThreadedEchoServer(context=server_context, chatty=False)
4480 with server:
4481 with client_context.wrap_socket(socket.socket(),
4482 server_hostname=hostname) as s:
4483 s.connect((HOST, server.port))
4484 # header, 5 lines for TLS 1.3
4485 self.assertEqual(self.keylog_lines(), 6)
4486
4487 client_context.keylog_filename = None
4488 server_context.keylog_filename = support.TESTFN
4489 server = ThreadedEchoServer(context=server_context, chatty=False)
4490 with server:
4491 with client_context.wrap_socket(socket.socket(),
4492 server_hostname=hostname) as s:
4493 s.connect((HOST, server.port))
4494 self.assertGreaterEqual(self.keylog_lines(), 11)
4495
4496 client_context.keylog_filename = support.TESTFN
4497 server_context.keylog_filename = support.TESTFN
4498 server = ThreadedEchoServer(context=server_context, chatty=False)
4499 with server:
4500 with client_context.wrap_socket(socket.socket(),
4501 server_hostname=hostname) as s:
4502 s.connect((HOST, server.port))
4503 self.assertGreaterEqual(self.keylog_lines(), 21)
4504
4505 client_context.keylog_filename = None
4506 server_context.keylog_filename = None
4507
4508 @requires_keylog
4509 @unittest.skipIf(sys.flags.ignore_environment,
4510 "test is not compatible with ignore_environment")
Paul Monsonf3550692019-06-19 13:09:54 -07004511 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004512 def test_keylog_env(self):
4513 self.addCleanup(support.unlink, support.TESTFN)
4514 with unittest.mock.patch.dict(os.environ):
4515 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4516 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4517
4518 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4519 self.assertEqual(ctx.keylog_filename, None)
4520
4521 ctx = ssl.create_default_context()
4522 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4523
4524 ctx = ssl._create_stdlib_context()
4525 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4526
4527 def test_msg_callback(self):
4528 client_context, server_context, hostname = testing_context()
4529
4530 def msg_cb(conn, direction, version, content_type, msg_type, data):
4531 pass
4532
4533 self.assertIs(client_context._msg_callback, None)
4534 client_context._msg_callback = msg_cb
4535 self.assertIs(client_context._msg_callback, msg_cb)
4536 with self.assertRaises(TypeError):
4537 client_context._msg_callback = object()
4538
4539 def test_msg_callback_tls12(self):
4540 client_context, server_context, hostname = testing_context()
4541 client_context.options |= ssl.OP_NO_TLSv1_3
4542
4543 msg = []
4544
4545 def msg_cb(conn, direction, version, content_type, msg_type, data):
4546 self.assertIsInstance(conn, ssl.SSLSocket)
4547 self.assertIsInstance(data, bytes)
4548 self.assertIn(direction, {'read', 'write'})
4549 msg.append((direction, version, content_type, msg_type))
4550
4551 client_context._msg_callback = msg_cb
4552
4553 server = ThreadedEchoServer(context=server_context, chatty=False)
4554 with server:
4555 with client_context.wrap_socket(socket.socket(),
4556 server_hostname=hostname) as s:
4557 s.connect((HOST, server.port))
4558
Christian Heimese35d1ba2019-06-03 20:40:15 +02004559 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004560 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4561 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004562 msg
4563 )
4564 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004565 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4566 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004567 msg
4568 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004569
4570
Thomas Woutersed03b412007-08-28 21:37:11 +00004571def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004572 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004573 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004574 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004575 'Mac': platform.mac_ver,
4576 'Windows': platform.win32_ver,
4577 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004578 for name, func in plats.items():
4579 plat = func()
4580 if plat and plat[0]:
4581 plat = '%s %r' % (name, plat)
4582 break
4583 else:
4584 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004585 print("test_ssl: testing with %r %r" %
4586 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4587 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004588 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004589 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4590 try:
4591 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4592 except AttributeError:
4593 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004594
Antoine Pitrou152efa22010-05-16 18:19:27 +00004595 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004596 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004597 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004598 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004599 BADCERT, BADKEY, EMPTYCERT]:
4600 if not os.path.exists(filename):
4601 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004602
Martin Panter3840b2a2016-03-27 01:53:46 +00004603 tests = [
4604 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004605 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004606 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004607 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004608
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004609 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004610 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004611
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004612 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004613 try:
4614 support.run_unittest(*tests)
4615 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004616 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004617
4618if __name__ == "__main__":
4619 test_main()