blob: 737a96989968cf5f669563c4dd1ac027f27034d8 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Christian Heimesc7f70692019-05-31 11:44:05 +02005import unittest.mock
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00007import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00008import select
Thomas Woutersed03b412007-08-28 21:37:11 +00009import time
Christian Heimes9424bb42013-06-17 15:32:57 +020010import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000011import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000012import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000013import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000014import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020016import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000017import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000018import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000019import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000020import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Victor Stinner8f4ef3b2019-07-01 18:28:25 +020029from ssl import TLSVersion, _TLSContentType, _TLSMessageType
Martin Panter3840b2a2016-03-27 01:53:46 +000030
Paul Monsonf3550692019-06-19 13:09:54 -070031Py_DEBUG = hasattr(sys, 'gettotalrefcount')
32Py_DEBUG_WIN32 = Py_DEBUG and sys.platform == 'win32'
33
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010034PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020036IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010037IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
38IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010039PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000040
Victor Stinner3ef63442019-02-19 18:06:03 +010041PROTOCOL_TO_TLS_VERSION = {}
42for proto, ver in (
43 ("PROTOCOL_SSLv23", "SSLv3"),
44 ("PROTOCOL_TLSv1", "TLSv1"),
45 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
46):
47 try:
48 proto = getattr(ssl, proto)
49 ver = getattr(ssl.TLSVersion, ver)
50 except AttributeError:
51 continue
52 PROTOCOL_TO_TLS_VERSION[proto] = ver
53
Christian Heimesefff7062013-11-21 03:35:02 +010054def data_file(*name):
55 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000056
Antoine Pitrou81564092010-10-08 23:06:24 +000057# The custom key and certificate files used in test_ssl are generated
58# using Lib/test/make_ssl_certs.py.
59# Other certificates are simply fetched from the Internet servers they
60# are meant to authenticate.
61
Antoine Pitrou152efa22010-05-16 18:19:27 +000062CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000063BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000064ONLYCERT = data_file("ssl_cert.pem")
65ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000066BYTES_ONLYCERT = os.fsencode(ONLYCERT)
67BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020068CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
69ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
70KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000071CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000072BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010073CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
74CAFILE_CACERT = data_file("capath", "5ed36f99.0")
75
Christian Heimesbd5c7d22018-01-20 15:16:30 +010076CERTFILE_INFO = {
77 'issuer': ((('countryName', 'XY'),),
78 (('localityName', 'Castle Anthrax'),),
79 (('organizationName', 'Python Software Foundation'),),
80 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020081 'notAfter': 'Aug 26 14:23:15 2028 GMT',
82 'notBefore': 'Aug 29 14:23:15 2018 GMT',
83 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010084 'subject': ((('countryName', 'XY'),),
85 (('localityName', 'Castle Anthrax'),),
86 (('organizationName', 'Python Software Foundation'),),
87 (('commonName', 'localhost'),)),
88 'subjectAltName': (('DNS', 'localhost'),),
89 'version': 3
90}
Antoine Pitrou152efa22010-05-16 18:19:27 +000091
Christian Heimes22587792013-11-21 23:56:13 +010092# empty CRL
93CRLFILE = data_file("revocation.crl")
94
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010095# Two keys and certs signed by the same CA (for SNI tests)
96SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020097SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010098
99SIGNED_CERTFILE_INFO = {
100 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
101 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
102 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
103 'issuer': ((('countryName', 'XY'),),
104 (('organizationName', 'Python Software Foundation CA'),),
105 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200106 'notAfter': 'Jul 7 14:23:16 2028 GMT',
107 'notBefore': 'Aug 29 14:23:16 2018 GMT',
108 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100109 'subject': ((('countryName', 'XY'),),
110 (('localityName', 'Castle Anthrax'),),
111 (('organizationName', 'Python Software Foundation'),),
112 (('commonName', 'localhost'),)),
113 'subjectAltName': (('DNS', 'localhost'),),
114 'version': 3
115}
116
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100117SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200118SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100119SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
120SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
121
Martin Panter3840b2a2016-03-27 01:53:46 +0000122# Same certificate as pycacert.pem, but without extra text in file
123SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200124# cert with all kinds of subject alt names
125ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100126IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100127
Martin Panter3d81d932016-01-14 09:36:00 +0000128REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000129
130EMPTYCERT = data_file("nullcert.pem")
131BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000132NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000133BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200134NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200135NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100136TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000137
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200138DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100139BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000140
Christian Heimes358cfd42016-09-10 22:43:48 +0200141# Not defined in all versions of OpenSSL
142OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
143OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
144OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
145OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100146OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200147
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100148
Thomas Woutersed03b412007-08-28 21:37:11 +0000149def handle_error(prefix):
150 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000151 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000152 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000153
Antoine Pitroub5218772010-05-21 09:56:06 +0000154def can_clear_options():
155 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200156 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000157
158def no_sslv2_implies_sslv3_hello():
159 # 0.9.7h or higher
160 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
161
Christian Heimes2427b502013-11-23 11:24:32 +0100162def have_verify_flags():
163 # 0.9.8 or higher
164 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
165
Christian Heimesb7b92252018-02-25 09:49:31 +0100166def _have_secp_curves():
167 if not ssl.HAS_ECDH:
168 return False
169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
170 try:
171 ctx.set_ecdh_curve("secp384r1")
172 except ValueError:
173 return False
174 else:
175 return True
176
177
178HAVE_SECP_CURVES = _have_secp_curves()
179
180
Antoine Pitrouc695c952014-04-28 20:57:36 +0200181def utc_offset(): #NOTE: ignore issues like #1647654
182 # local time = utc time + utc offset
183 if time.daylight and time.localtime().tm_isdst > 0:
184 return -time.altzone # seconds
185 return -time.timezone
186
Christian Heimes9424bb42013-06-17 15:32:57 +0200187def asn1time(cert_time):
188 # Some versions of OpenSSL ignore seconds, see #18207
189 # 0.9.8.i
190 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
191 fmt = "%b %d %H:%M:%S %Y GMT"
192 dt = datetime.datetime.strptime(cert_time, fmt)
193 dt = dt.replace(second=0)
194 cert_time = dt.strftime(fmt)
195 # %d adds leading zero but ASN1_TIME_print() uses leading space
196 if cert_time[4] == "0":
197 cert_time = cert_time[:4] + " " + cert_time[5:]
198
199 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000200
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100201needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
202
Antoine Pitrou23df4832010-08-04 17:14:06 +0000203
Christian Heimesd0486372016-09-10 23:23:33 +0200204def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
205 cert_reqs=ssl.CERT_NONE, ca_certs=None,
206 ciphers=None, certfile=None, keyfile=None,
207 **kwargs):
208 context = ssl.SSLContext(ssl_version)
209 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200210 if cert_reqs == ssl.CERT_NONE:
211 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200212 context.verify_mode = cert_reqs
213 if ca_certs is not None:
214 context.load_verify_locations(ca_certs)
215 if certfile is not None or keyfile is not None:
216 context.load_cert_chain(certfile, keyfile)
217 if ciphers is not None:
218 context.set_ciphers(ciphers)
219 return context.wrap_socket(sock, **kwargs)
220
Christian Heimesa170fa12017-09-15 20:27:30 +0200221
222def testing_context(server_cert=SIGNED_CERTFILE):
223 """Create context
224
225 client_context, server_context, hostname = testing_context()
226 """
227 if server_cert == SIGNED_CERTFILE:
228 hostname = SIGNED_CERTFILE_HOSTNAME
229 elif server_cert == SIGNED_CERTFILE2:
230 hostname = SIGNED_CERTFILE2_HOSTNAME
231 else:
232 raise ValueError(server_cert)
233
234 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
235 client_context.load_verify_locations(SIGNING_CA)
236
237 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
238 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200239 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200240
241 return client_context, server_context, hostname
242
243
Antoine Pitrou152efa22010-05-16 18:19:27 +0000244class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000245
Antoine Pitrou480a1242010-04-28 21:37:09 +0000246 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000247 ssl.CERT_NONE
248 ssl.CERT_OPTIONAL
249 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100250 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100251 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100252 if ssl.HAS_ECDH:
253 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100254 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
255 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000256 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100257 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700258 ssl.OP_NO_SSLv2
259 ssl.OP_NO_SSLv3
260 ssl.OP_NO_TLSv1
261 ssl.OP_NO_TLSv1_3
262 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
263 ssl.OP_NO_TLSv1_1
264 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200265 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000266
Christian Heimes9d50ab52018-02-27 10:17:30 +0100267 def test_private_init(self):
268 with self.assertRaisesRegex(TypeError, "public constructor"):
269 with socket.socket() as s:
270 ssl.SSLSocket(s)
271
Antoine Pitrou172f0252014-04-18 20:33:08 +0200272 def test_str_for_enums(self):
273 # Make sure that the PROTOCOL_* constants have enum-like string
274 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200275 proto = ssl.PROTOCOL_TLS
276 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200277 ctx = ssl.SSLContext(proto)
278 self.assertIs(ctx.protocol, proto)
279
Antoine Pitrou480a1242010-04-28 21:37:09 +0000280 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000282 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000283 sys.stdout.write("\n RAND_status is %d (%s)\n"
284 % (v, (v and "sufficient randomness") or
285 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200286
287 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
288 self.assertEqual(len(data), 16)
289 self.assertEqual(is_cryptographic, v == 1)
290 if v:
291 data = ssl.RAND_bytes(16)
292 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200293 else:
294 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200295
Victor Stinner1e81a392013-12-19 16:47:04 +0100296 # negative num is invalid
297 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
298 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
299
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100300 if hasattr(ssl, 'RAND_egd'):
301 self.assertRaises(TypeError, ssl.RAND_egd, 1)
302 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000303 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200304 ssl.RAND_add(b"this is a random bytes object", 75.0)
305 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000306
Christian Heimesf77b4b22013-08-21 13:26:05 +0200307 @unittest.skipUnless(os.name == 'posix', 'requires posix')
308 def test_random_fork(self):
309 status = ssl.RAND_status()
310 if not status:
311 self.fail("OpenSSL's PRNG has insufficient randomness")
312
313 rfd, wfd = os.pipe()
314 pid = os.fork()
315 if pid == 0:
316 try:
317 os.close(rfd)
318 child_random = ssl.RAND_pseudo_bytes(16)[0]
319 self.assertEqual(len(child_random), 16)
320 os.write(wfd, child_random)
321 os.close(wfd)
322 except BaseException:
323 os._exit(1)
324 else:
325 os._exit(0)
326 else:
327 os.close(wfd)
328 self.addCleanup(os.close, rfd)
329 _, status = os.waitpid(pid, 0)
330 self.assertEqual(status, 0)
331
332 child_random = os.read(rfd, 16)
333 self.assertEqual(len(child_random), 16)
334 parent_random = ssl.RAND_pseudo_bytes(16)[0]
335 self.assertEqual(len(parent_random), 16)
336
337 self.assertNotEqual(child_random, parent_random)
338
Christian Heimese6dac002018-08-30 07:25:49 +0200339 maxDiff = None
340
Antoine Pitrou480a1242010-04-28 21:37:09 +0000341 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000342 # note that this uses an 'unofficial' function in _ssl.c,
343 # provided solely for this test, to exercise the certificate
344 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100345 self.assertEqual(
346 ssl._ssl._test_decode_cert(CERTFILE),
347 CERTFILE_INFO
348 )
349 self.assertEqual(
350 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
351 SIGNED_CERTFILE_INFO
352 )
353
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200354 # Issue #13034: the subjectAltName in some certificates
355 # (notably projects.developer.nokia.com:443) wasn't parsed
356 p = ssl._ssl._test_decode_cert(NOKIACERT)
357 if support.verbose:
358 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
359 self.assertEqual(p['subjectAltName'],
360 (('DNS', 'projects.developer.nokia.com'),
361 ('DNS', 'projects.forum.nokia.com'))
362 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100363 # extra OCSP and AIA fields
364 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
365 self.assertEqual(p['caIssuers'],
366 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
367 self.assertEqual(p['crlDistributionPoints'],
368 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000369
Christian Heimesa37f5242019-01-15 23:47:42 +0100370 def test_parse_cert_CVE_2019_5010(self):
371 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
372 if support.verbose:
373 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
374 self.assertEqual(
375 p,
376 {
377 'issuer': (
378 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
379 'notAfter': 'Jun 14 18:00:58 2028 GMT',
380 'notBefore': 'Jun 18 18:00:58 2018 GMT',
381 'serialNumber': '02',
382 'subject': ((('countryName', 'UK'),),
383 (('commonName',
384 'codenomicon-vm-2.test.lal.cisco.com'),)),
385 'subjectAltName': (
386 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
387 'version': 3
388 }
389 )
390
Christian Heimes824f7f32013-08-17 00:54:47 +0200391 def test_parse_cert_CVE_2013_4238(self):
392 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
393 if support.verbose:
394 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
395 subject = ((('countryName', 'US'),),
396 (('stateOrProvinceName', 'Oregon'),),
397 (('localityName', 'Beaverton'),),
398 (('organizationName', 'Python Software Foundation'),),
399 (('organizationalUnitName', 'Python Core Development'),),
400 (('commonName', 'null.python.org\x00example.org'),),
401 (('emailAddress', 'python-dev@python.org'),))
402 self.assertEqual(p['subject'], subject)
403 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200404 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
405 san = (('DNS', 'altnull.python.org\x00example.com'),
406 ('email', 'null@python.org\x00user@example.org'),
407 ('URI', 'http://null.python.org\x00http://example.org'),
408 ('IP Address', '192.0.2.1'),
409 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
410 else:
411 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
412 san = (('DNS', 'altnull.python.org\x00example.com'),
413 ('email', 'null@python.org\x00user@example.org'),
414 ('URI', 'http://null.python.org\x00http://example.org'),
415 ('IP Address', '192.0.2.1'),
416 ('IP Address', '<invalid>'))
417
418 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200419
Christian Heimes1c03abd2016-09-06 23:25:35 +0200420 def test_parse_all_sans(self):
421 p = ssl._ssl._test_decode_cert(ALLSANFILE)
422 self.assertEqual(p['subjectAltName'],
423 (
424 ('DNS', 'allsans'),
425 ('othername', '<unsupported>'),
426 ('othername', '<unsupported>'),
427 ('email', 'user@example.org'),
428 ('DNS', 'www.example.org'),
429 ('DirName',
430 ((('countryName', 'XY'),),
431 (('localityName', 'Castle Anthrax'),),
432 (('organizationName', 'Python Software Foundation'),),
433 (('commonName', 'dirname example'),))),
434 ('URI', 'https://www.python.org/'),
435 ('IP Address', '127.0.0.1'),
436 ('IP Address', '0:0:0:0:0:0:0:1\n'),
437 ('Registered ID', '1.2.3.4.5')
438 )
439 )
440
Antoine Pitrou480a1242010-04-28 21:37:09 +0000441 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000442 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000443 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000444 d1 = ssl.PEM_cert_to_DER_cert(pem)
445 p2 = ssl.DER_cert_to_PEM_cert(d1)
446 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000447 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000448 if not p2.startswith(ssl.PEM_HEADER + '\n'):
449 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
450 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
451 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000452
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000453 def test_openssl_version(self):
454 n = ssl.OPENSSL_VERSION_NUMBER
455 t = ssl.OPENSSL_VERSION_INFO
456 s = ssl.OPENSSL_VERSION
457 self.assertIsInstance(n, int)
458 self.assertIsInstance(t, tuple)
459 self.assertIsInstance(s, str)
460 # Some sanity checks follow
461 # >= 0.9
462 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400463 # < 3.0
464 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000465 major, minor, fix, patch, status = t
466 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400467 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000468 self.assertGreaterEqual(minor, 0)
469 self.assertLess(minor, 256)
470 self.assertGreaterEqual(fix, 0)
471 self.assertLess(fix, 256)
472 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100473 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000474 self.assertGreaterEqual(status, 0)
475 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400476 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200477 if IS_LIBRESSL:
478 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100479 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400480 else:
481 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100482 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000483
Antoine Pitrou9d543662010-04-23 23:10:32 +0000484 @support.cpython_only
485 def test_refcycle(self):
486 # Issue #7943: an SSL object doesn't create reference cycles with
487 # itself.
488 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200489 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000490 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100491 with support.check_warnings(("", ResourceWarning)):
492 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100493 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000494
Antoine Pitroua468adc2010-09-14 14:43:44 +0000495 def test_wrapped_unconnected(self):
496 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200497 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000498 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200499 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100500 self.assertRaises(OSError, ss.recv, 1)
501 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
502 self.assertRaises(OSError, ss.recvfrom, 1)
503 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
504 self.assertRaises(OSError, ss.send, b'x')
505 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200506 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100507 self.assertRaises(NotImplementedError, ss.sendmsg,
508 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200509 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
510 self.assertRaises(NotImplementedError, ss.recvmsg_into,
511 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000512
Antoine Pitrou40f08742010-04-24 22:04:40 +0000513 def test_timeout(self):
514 # Issue #8524: when creating an SSL socket, the timeout of the
515 # original socket should be retained.
516 for timeout in (None, 0.0, 5.0):
517 s = socket.socket(socket.AF_INET)
518 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200519 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100520 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000521
Christian Heimesd0486372016-09-10 23:23:33 +0200522 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000523 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000524 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000525 "certfile must be specified",
526 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000527 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000528 "certfile must be specified for server-side operations",
529 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000530 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000531 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200532 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100533 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
534 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200535 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200536 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000537 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000538 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000539 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200540 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000541 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000542 ssl.wrap_socket(sock,
543 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000544 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200545 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000546 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000547 ssl.wrap_socket(sock,
548 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000549 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000550
Martin Panter3464ea22016-02-01 21:58:11 +0000551 def bad_cert_test(self, certfile):
552 """Check that trying to use the given client certificate fails"""
553 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
554 certfile)
555 sock = socket.socket()
556 self.addCleanup(sock.close)
557 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200558 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200559 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000560
561 def test_empty_cert(self):
562 """Wrapping with an empty cert file"""
563 self.bad_cert_test("nullcert.pem")
564
565 def test_malformed_cert(self):
566 """Wrapping with a badly formatted certificate (syntax error)"""
567 self.bad_cert_test("badcert.pem")
568
569 def test_malformed_key(self):
570 """Wrapping with a badly formatted key (syntax error)"""
571 self.bad_cert_test("badkey.pem")
572
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000573 def test_match_hostname(self):
574 def ok(cert, hostname):
575 ssl.match_hostname(cert, hostname)
576 def fail(cert, hostname):
577 self.assertRaises(ssl.CertificateError,
578 ssl.match_hostname, cert, hostname)
579
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100580 # -- Hostname matching --
581
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000582 cert = {'subject': ((('commonName', 'example.com'),),)}
583 ok(cert, 'example.com')
584 ok(cert, 'ExAmple.cOm')
585 fail(cert, 'www.example.com')
586 fail(cert, '.example.com')
587 fail(cert, 'example.org')
588 fail(cert, 'exampleXcom')
589
590 cert = {'subject': ((('commonName', '*.a.com'),),)}
591 ok(cert, 'foo.a.com')
592 fail(cert, 'bar.foo.a.com')
593 fail(cert, 'a.com')
594 fail(cert, 'Xa.com')
595 fail(cert, '.a.com')
596
Mandeep Singhede2ac92017-11-27 04:01:27 +0530597 # only match wildcards when they are the only thing
598 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530600 fail(cert, 'foo.com')
601 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000602 fail(cert, 'bar.com')
603 fail(cert, 'foo.a.com')
604 fail(cert, 'bar.foo.com')
605
Christian Heimes824f7f32013-08-17 00:54:47 +0200606 # NULL bytes are bad, CVE-2013-4073
607 cert = {'subject': ((('commonName',
608 'null.python.org\x00example.org'),),)}
609 ok(cert, 'null.python.org\x00example.org') # or raise an error?
610 fail(cert, 'example.org')
611 fail(cert, 'null.python.org')
612
Georg Brandl72c98d32013-10-27 07:16:53 +0100613 # error cases with wildcards
614 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
615 fail(cert, 'bar.foo.a.com')
616 fail(cert, 'a.com')
617 fail(cert, 'Xa.com')
618 fail(cert, '.a.com')
619
620 cert = {'subject': ((('commonName', 'a.*.com'),),)}
621 fail(cert, 'a.foo.com')
622 fail(cert, 'a..com')
623 fail(cert, 'a.com')
624
625 # wildcard doesn't match IDNA prefix 'xn--'
626 idna = 'püthon.python.org'.encode("idna").decode("ascii")
627 cert = {'subject': ((('commonName', idna),),)}
628 ok(cert, idna)
629 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
630 fail(cert, idna)
631 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
632 fail(cert, idna)
633
634 # wildcard in first fragment and IDNA A-labels in sequent fragments
635 # are supported.
636 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
637 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530638 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
639 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100640 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
641 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
642
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000643 # Slightly fake real-world example
644 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
645 'subject': ((('commonName', 'linuxfrz.org'),),),
646 'subjectAltName': (('DNS', 'linuxfr.org'),
647 ('DNS', 'linuxfr.com'),
648 ('othername', '<unsupported>'))}
649 ok(cert, 'linuxfr.org')
650 ok(cert, 'linuxfr.com')
651 # Not a "DNS" entry
652 fail(cert, '<unsupported>')
653 # When there is a subjectAltName, commonName isn't used
654 fail(cert, 'linuxfrz.org')
655
656 # A pristine real-world example
657 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
658 'subject': ((('countryName', 'US'),),
659 (('stateOrProvinceName', 'California'),),
660 (('localityName', 'Mountain View'),),
661 (('organizationName', 'Google Inc'),),
662 (('commonName', 'mail.google.com'),))}
663 ok(cert, 'mail.google.com')
664 fail(cert, 'gmail.com')
665 # Only commonName is considered
666 fail(cert, 'California')
667
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100668 # -- IPv4 matching --
669 cert = {'subject': ((('commonName', 'example.com'),),),
670 'subjectAltName': (('DNS', 'example.com'),
671 ('IP Address', '10.11.12.13'),
Christian Heimes477b1b22019-07-02 20:39:42 +0200672 ('IP Address', '14.15.16.17'),
673 ('IP Address', '127.0.0.1'))}
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100674 ok(cert, '10.11.12.13')
675 ok(cert, '14.15.16.17')
Christian Heimes477b1b22019-07-02 20:39:42 +0200676 # socket.inet_ntoa(socket.inet_aton('127.1')) == '127.0.0.1'
677 fail(cert, '127.1')
678 fail(cert, '14.15.16.17 ')
679 fail(cert, '14.15.16.17 extra data')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100680 fail(cert, '14.15.16.18')
681 fail(cert, 'example.net')
682
683 # -- IPv6 matching --
Zackery Spytzc2cda632019-06-30 09:24:43 -0600684 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100685 cert = {'subject': ((('commonName', 'example.com'),),),
686 'subjectAltName': (
687 ('DNS', 'example.com'),
688 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
689 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
690 ok(cert, '2001::cafe')
691 ok(cert, '2003::baba')
Christian Heimes477b1b22019-07-02 20:39:42 +0200692 fail(cert, '2003::baba ')
693 fail(cert, '2003::baba extra data')
Christian Heimesaef12832018-02-24 14:35:56 +0100694 fail(cert, '2003::bebe')
695 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100696
697 # -- Miscellaneous --
698
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000699 # Neither commonName nor subjectAltName
700 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
701 'subject': ((('countryName', 'US'),),
702 (('stateOrProvinceName', 'California'),),
703 (('localityName', 'Mountain View'),),
704 (('organizationName', 'Google Inc'),))}
705 fail(cert, 'mail.google.com')
706
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200707 # No DNS entry in subjectAltName but a commonName
708 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
709 'subject': ((('countryName', 'US'),),
710 (('stateOrProvinceName', 'California'),),
711 (('localityName', 'Mountain View'),),
712 (('commonName', 'mail.google.com'),)),
713 'subjectAltName': (('othername', 'blabla'), )}
714 ok(cert, 'mail.google.com')
715
716 # No DNS entry subjectAltName and no commonName
717 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
718 'subject': ((('countryName', 'US'),),
719 (('stateOrProvinceName', 'California'),),
720 (('localityName', 'Mountain View'),),
721 (('organizationName', 'Google Inc'),)),
722 'subjectAltName': (('othername', 'blabla'),)}
723 fail(cert, 'google.com')
724
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000725 # Empty cert / no cert
726 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
727 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
728
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200729 # Issue #17980: avoid denials of service by refusing more than one
730 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100731 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
732 with self.assertRaisesRegex(
733 ssl.CertificateError,
734 "partial wildcards in leftmost label are not supported"):
735 ssl.match_hostname(cert, 'axxb.example.com')
736
737 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
738 with self.assertRaisesRegex(
739 ssl.CertificateError,
740 "wildcard can only be present in the leftmost label"):
741 ssl.match_hostname(cert, 'www.sub.example.com')
742
743 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
744 with self.assertRaisesRegex(
745 ssl.CertificateError,
746 "too many wildcards"):
747 ssl.match_hostname(cert, 'axxbxxc.example.com')
748
749 cert = {'subject': ((('commonName', '*'),),)}
750 with self.assertRaisesRegex(
751 ssl.CertificateError,
752 "sole wildcard without additional labels are not support"):
753 ssl.match_hostname(cert, 'host')
754
755 cert = {'subject': ((('commonName', '*.com'),),)}
756 with self.assertRaisesRegex(
757 ssl.CertificateError,
758 r"hostname 'com' doesn't match '\*.com'"):
759 ssl.match_hostname(cert, 'com')
760
761 # extra checks for _inet_paton()
762 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
763 with self.assertRaises(ValueError):
764 ssl._inet_paton(invalid)
765 for ipaddr in ['127.0.0.1', '192.168.0.1']:
766 self.assertTrue(ssl._inet_paton(ipaddr))
Zackery Spytzc2cda632019-06-30 09:24:43 -0600767 if support.IPV6_ENABLED:
Christian Heimesaef12832018-02-24 14:35:56 +0100768 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
769 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200770
Antoine Pitroud5323212010-10-22 18:19:07 +0000771 def test_server_side(self):
772 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200773 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000774 with socket.socket() as sock:
775 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
776 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000777
Antoine Pitroud6494802011-07-21 01:11:30 +0200778 def test_unknown_channel_binding(self):
779 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200780 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200781 c = socket.socket(socket.AF_INET)
782 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200783 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100784 with self.assertRaises(ValueError):
785 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200786 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200787
788 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
789 "'tls-unique' channel binding not available")
790 def test_tls_unique_channel_binding(self):
791 # unconnected should return None for known type
792 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200793 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100794 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200795 # the same for server-side
796 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200797 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100798 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200799
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600800 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200801 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600802 r = repr(ss)
803 with self.assertWarns(ResourceWarning) as cm:
804 ss = None
805 support.gc_collect()
806 self.assertIn(r, str(cm.warning.args[0]))
807
Christian Heimes6d7ad132013-06-09 18:02:55 +0200808 def test_get_default_verify_paths(self):
809 paths = ssl.get_default_verify_paths()
810 self.assertEqual(len(paths), 6)
811 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
812
813 with support.EnvironmentVarGuard() as env:
814 env["SSL_CERT_DIR"] = CAPATH
815 env["SSL_CERT_FILE"] = CERTFILE
816 paths = ssl.get_default_verify_paths()
817 self.assertEqual(paths.cafile, CERTFILE)
818 self.assertEqual(paths.capath, CAPATH)
819
Christian Heimes44109d72013-11-22 01:51:30 +0100820 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
821 def test_enum_certificates(self):
822 self.assertTrue(ssl.enum_certificates("CA"))
823 self.assertTrue(ssl.enum_certificates("ROOT"))
824
825 self.assertRaises(TypeError, ssl.enum_certificates)
826 self.assertRaises(WindowsError, ssl.enum_certificates, "")
827
Christian Heimesc2d65e12013-11-22 16:13:55 +0100828 trust_oids = set()
829 for storename in ("CA", "ROOT"):
830 store = ssl.enum_certificates(storename)
831 self.assertIsInstance(store, list)
832 for element in store:
833 self.assertIsInstance(element, tuple)
834 self.assertEqual(len(element), 3)
835 cert, enc, trust = element
836 self.assertIsInstance(cert, bytes)
837 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
Christian Heimes915cd3f2019-09-09 18:06:55 +0200838 self.assertIsInstance(trust, (frozenset, set, bool))
839 if isinstance(trust, (frozenset, set)):
Christian Heimesc2d65e12013-11-22 16:13:55 +0100840 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100841
842 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100843 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200844
Christian Heimes46bebee2013-06-09 19:03:31 +0200845 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100846 def test_enum_crls(self):
847 self.assertTrue(ssl.enum_crls("CA"))
848 self.assertRaises(TypeError, ssl.enum_crls)
849 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200850
Christian Heimes44109d72013-11-22 01:51:30 +0100851 crls = ssl.enum_crls("CA")
852 self.assertIsInstance(crls, list)
853 for element in crls:
854 self.assertIsInstance(element, tuple)
855 self.assertEqual(len(element), 2)
856 self.assertIsInstance(element[0], bytes)
857 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200858
Christian Heimes46bebee2013-06-09 19:03:31 +0200859
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100860 def test_asn1object(self):
861 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
862 '1.3.6.1.5.5.7.3.1')
863
864 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
865 self.assertEqual(val, expected)
866 self.assertEqual(val.nid, 129)
867 self.assertEqual(val.shortname, 'serverAuth')
868 self.assertEqual(val.longname, 'TLS Web Server Authentication')
869 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
870 self.assertIsInstance(val, ssl._ASN1Object)
871 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
872
873 val = ssl._ASN1Object.fromnid(129)
874 self.assertEqual(val, expected)
875 self.assertIsInstance(val, ssl._ASN1Object)
876 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100877 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
878 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100879 for i in range(1000):
880 try:
881 obj = ssl._ASN1Object.fromnid(i)
882 except ValueError:
883 pass
884 else:
885 self.assertIsInstance(obj.nid, int)
886 self.assertIsInstance(obj.shortname, str)
887 self.assertIsInstance(obj.longname, str)
888 self.assertIsInstance(obj.oid, (str, type(None)))
889
890 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
891 self.assertEqual(val, expected)
892 self.assertIsInstance(val, ssl._ASN1Object)
893 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
894 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
895 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100896 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
897 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100898
Christian Heimes72d28502013-11-23 13:56:58 +0100899 def test_purpose_enum(self):
900 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
901 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
902 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
903 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
904 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
905 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
906 '1.3.6.1.5.5.7.3.1')
907
908 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
909 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
910 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
911 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
912 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
913 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
914 '1.3.6.1.5.5.7.3.2')
915
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100916 def test_unsupported_dtls(self):
917 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
918 self.addCleanup(s.close)
919 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200920 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100921 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200922 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100923 with self.assertRaises(NotImplementedError) as cx:
924 ctx.wrap_socket(s)
925 self.assertEqual(str(cx.exception), "only stream sockets are supported")
926
Antoine Pitrouc695c952014-04-28 20:57:36 +0200927 def cert_time_ok(self, timestring, timestamp):
928 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
929
930 def cert_time_fail(self, timestring):
931 with self.assertRaises(ValueError):
932 ssl.cert_time_to_seconds(timestring)
933
934 @unittest.skipUnless(utc_offset(),
935 'local time needs to be different from UTC')
936 def test_cert_time_to_seconds_timezone(self):
937 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
938 # results if local timezone is not UTC
939 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
940 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
941
942 def test_cert_time_to_seconds(self):
943 timestring = "Jan 5 09:34:43 2018 GMT"
944 ts = 1515144883.0
945 self.cert_time_ok(timestring, ts)
946 # accept keyword parameter, assert its name
947 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
948 # accept both %e and %d (space or zero generated by strftime)
949 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
950 # case-insensitive
951 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
952 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
953 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
954 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
955 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
956 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
957 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
958 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
959
960 newyear_ts = 1230768000.0
961 # leap seconds
962 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
963 # same timestamp
964 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
965
966 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
967 # allow 60th second (even if it is not a leap second)
968 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
969 # allow 2nd leap second for compatibility with time.strptime()
970 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
971 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
972
Mike53f7a7c2017-12-14 14:04:53 +0300973 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200974 # 99991231235959Z (rfc 5280)
975 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
976
977 @support.run_with_locale('LC_ALL', '')
978 def test_cert_time_to_seconds_locale(self):
979 # `cert_time_to_seconds()` should be locale independent
980
981 def local_february_name():
982 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
983
984 if local_february_name().lower() == 'feb':
985 self.skipTest("locale-specific month name needs to be "
986 "different from C locale")
987
988 # locale-independent
989 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
990 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
991
Martin Panter3840b2a2016-03-27 01:53:46 +0000992 def test_connect_ex_error(self):
993 server = socket.socket(socket.AF_INET)
994 self.addCleanup(server.close)
995 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200996 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000997 cert_reqs=ssl.CERT_REQUIRED)
998 self.addCleanup(s.close)
999 rc = s.connect_ex((HOST, port))
1000 # Issue #19919: Windows machines or VMs hosted on Windows
1001 # machines sometimes return EWOULDBLOCK.
1002 errors = (
1003 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1004 errno.EWOULDBLOCK,
1005 )
1006 self.assertIn(rc, errors)
1007
Christian Heimesa6bc95a2013-11-17 19:59:14 +01001008
Antoine Pitrou152efa22010-05-16 18:19:27 +00001009class ContextTests(unittest.TestCase):
1010
1011 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001012 for protocol in PROTOCOLS:
1013 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001014 ctx = ssl.SSLContext()
1015 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001016 self.assertRaises(ValueError, ssl.SSLContext, -1)
1017 self.assertRaises(ValueError, ssl.SSLContext, 42)
1018
1019 def test_protocol(self):
1020 for proto in PROTOCOLS:
1021 ctx = ssl.SSLContext(proto)
1022 self.assertEqual(ctx.protocol, proto)
1023
1024 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001025 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001026 ctx.set_ciphers("ALL")
1027 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001028 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001029 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001030
Christian Heimes892d66e2018-01-29 14:10:18 +01001031 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1032 "Test applies only to Python default ciphers")
1033 def test_python_ciphers(self):
1034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1035 ciphers = ctx.get_ciphers()
1036 for suite in ciphers:
1037 name = suite['name']
1038 self.assertNotIn("PSK", name)
1039 self.assertNotIn("SRP", name)
1040 self.assertNotIn("MD5", name)
1041 self.assertNotIn("RC4", name)
1042 self.assertNotIn("3DES", name)
1043
Christian Heimes25bfcd52016-09-06 00:04:45 +02001044 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1045 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001046 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001047 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001048 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001049 self.assertIn('AES256-GCM-SHA384', names)
1050 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001051
Antoine Pitroub5218772010-05-21 09:56:06 +00001052 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001053 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001054 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001055 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001056 # SSLContext also enables these by default
1057 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001058 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1059 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001060 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001061 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001062 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001063 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001064 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1065 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001066 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001067 # Ubuntu has OP_NO_SSLv3 forced on by default
1068 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001069 else:
1070 with self.assertRaises(ValueError):
1071 ctx.options = 0
1072
Christian Heimesa170fa12017-09-15 20:27:30 +02001073 def test_verify_mode_protocol(self):
1074 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001075 # Default value
1076 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1077 ctx.verify_mode = ssl.CERT_OPTIONAL
1078 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1079 ctx.verify_mode = ssl.CERT_REQUIRED
1080 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1081 ctx.verify_mode = ssl.CERT_NONE
1082 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1083 with self.assertRaises(TypeError):
1084 ctx.verify_mode = None
1085 with self.assertRaises(ValueError):
1086 ctx.verify_mode = 42
1087
Christian Heimesa170fa12017-09-15 20:27:30 +02001088 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1089 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1090 self.assertFalse(ctx.check_hostname)
1091
1092 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1093 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1094 self.assertTrue(ctx.check_hostname)
1095
Christian Heimes61d478c2018-01-27 15:51:38 +01001096 def test_hostname_checks_common_name(self):
1097 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1098 self.assertTrue(ctx.hostname_checks_common_name)
1099 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1100 ctx.hostname_checks_common_name = True
1101 self.assertTrue(ctx.hostname_checks_common_name)
1102 ctx.hostname_checks_common_name = False
1103 self.assertFalse(ctx.hostname_checks_common_name)
1104 ctx.hostname_checks_common_name = True
1105 self.assertTrue(ctx.hostname_checks_common_name)
1106 else:
1107 with self.assertRaises(AttributeError):
1108 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001109
Christian Heimes698dde12018-02-27 11:54:43 +01001110 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1111 "required OpenSSL 1.1.0g")
Christian Heimesc9bc49c2019-09-11 19:24:47 +02001112 @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
Christian Heimes698dde12018-02-27 11:54:43 +01001113 def test_min_max_version(self):
1114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001115 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1116 # Fedora override the setting to TLS 1.0.
1117 self.assertIn(
1118 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001119 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1120 # Fedora 29 uses TLS 1.0 by default
1121 ssl.TLSVersion.TLSv1,
1122 # RHEL 8 uses TLS 1.2 by default
1123 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001124 )
1125 self.assertEqual(
1126 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1127 )
1128
1129 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1130 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1131 self.assertEqual(
1132 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1133 )
1134 self.assertEqual(
1135 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1136 )
1137
1138 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1139 ctx.maximum_version = ssl.TLSVersion.TLSv1
1140 self.assertEqual(
1141 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1142 )
1143 self.assertEqual(
1144 ctx.maximum_version, ssl.TLSVersion.TLSv1
1145 )
1146
1147 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1148 self.assertEqual(
1149 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1150 )
1151
1152 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1153 self.assertIn(
1154 ctx.maximum_version,
1155 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1156 )
1157
1158 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1159 self.assertIn(
1160 ctx.minimum_version,
1161 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1162 )
1163
1164 with self.assertRaises(ValueError):
1165 ctx.minimum_version = 42
1166
1167 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1168
1169 self.assertEqual(
1170 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1171 )
1172 self.assertEqual(
1173 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1174 )
1175 with self.assertRaises(ValueError):
1176 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1177 with self.assertRaises(ValueError):
1178 ctx.maximum_version = ssl.TLSVersion.TLSv1
1179
1180
Christian Heimes2427b502013-11-23 11:24:32 +01001181 @unittest.skipUnless(have_verify_flags(),
1182 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001183 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001184 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001185 # default value
1186 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1187 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001188 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1189 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1190 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1191 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1192 ctx.verify_flags = ssl.VERIFY_DEFAULT
1193 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1194 # supports any value
1195 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1196 self.assertEqual(ctx.verify_flags,
1197 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1198 with self.assertRaises(TypeError):
1199 ctx.verify_flags = None
1200
Antoine Pitrou152efa22010-05-16 18:19:27 +00001201 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001202 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001203 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001204 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001205 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1206 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001207 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001208 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001209 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001210 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001211 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001212 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001213 ctx.load_cert_chain(EMPTYCERT)
1214 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001215 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001216 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1217 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1218 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001219 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001220 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001221 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001222 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001223 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001224 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1225 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001226 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001227 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001228 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001229 # Password protected key and cert
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1231 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1232 ctx.load_cert_chain(CERTFILE_PROTECTED,
1233 password=bytearray(KEY_PASSWORD.encode()))
1234 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1235 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1236 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1237 bytearray(KEY_PASSWORD.encode()))
1238 with self.assertRaisesRegex(TypeError, "should be a string"):
1239 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1240 with self.assertRaises(ssl.SSLError):
1241 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1242 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1243 # openssl has a fixed limit on the password buffer.
1244 # PEM_BUFSIZE is generally set to 1kb.
1245 # Return a string larger than this.
1246 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1247 # Password callback
1248 def getpass_unicode():
1249 return KEY_PASSWORD
1250 def getpass_bytes():
1251 return KEY_PASSWORD.encode()
1252 def getpass_bytearray():
1253 return bytearray(KEY_PASSWORD.encode())
1254 def getpass_badpass():
1255 return "badpass"
1256 def getpass_huge():
1257 return b'a' * (1024 * 1024)
1258 def getpass_bad_type():
1259 return 9
1260 def getpass_exception():
1261 raise Exception('getpass error')
1262 class GetPassCallable:
1263 def __call__(self):
1264 return KEY_PASSWORD
1265 def getpass(self):
1266 return KEY_PASSWORD
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1268 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1269 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1270 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1271 ctx.load_cert_chain(CERTFILE_PROTECTED,
1272 password=GetPassCallable().getpass)
1273 with self.assertRaises(ssl.SSLError):
1274 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1275 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1276 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1277 with self.assertRaisesRegex(TypeError, "must return a string"):
1278 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1279 with self.assertRaisesRegex(Exception, "getpass error"):
1280 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1281 # Make sure the password function isn't called if it isn't needed
1282 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001283
1284 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001285 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001286 ctx.load_verify_locations(CERTFILE)
1287 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1288 ctx.load_verify_locations(BYTES_CERTFILE)
1289 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1290 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001291 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001292 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001293 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001294 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001295 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001296 ctx.load_verify_locations(BADCERT)
1297 ctx.load_verify_locations(CERTFILE, CAPATH)
1298 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1299
Victor Stinner80f75e62011-01-29 11:31:20 +00001300 # Issue #10989: crash if the second argument type is invalid
1301 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1302
Christian Heimesefff7062013-11-21 03:35:02 +01001303 def test_load_verify_cadata(self):
1304 # test cadata
1305 with open(CAFILE_CACERT) as f:
1306 cacert_pem = f.read()
1307 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1308 with open(CAFILE_NEURONIO) as f:
1309 neuronio_pem = f.read()
1310 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1311
1312 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001313 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001314 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1315 ctx.load_verify_locations(cadata=cacert_pem)
1316 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1317 ctx.load_verify_locations(cadata=neuronio_pem)
1318 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1319 # cert already in hash table
1320 ctx.load_verify_locations(cadata=neuronio_pem)
1321 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1322
1323 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001324 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001325 combined = "\n".join((cacert_pem, neuronio_pem))
1326 ctx.load_verify_locations(cadata=combined)
1327 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1328
1329 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001330 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001331 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1332 neuronio_pem, "tail"]
1333 ctx.load_verify_locations(cadata="\n".join(combined))
1334 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1335
1336 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001337 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001338 ctx.load_verify_locations(cadata=cacert_der)
1339 ctx.load_verify_locations(cadata=neuronio_der)
1340 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1341 # cert already in hash table
1342 ctx.load_verify_locations(cadata=cacert_der)
1343 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1344
1345 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001346 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001347 combined = b"".join((cacert_der, neuronio_der))
1348 ctx.load_verify_locations(cadata=combined)
1349 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1350
1351 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001352 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001353 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1354
1355 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1356 ctx.load_verify_locations(cadata="broken")
1357 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1358 ctx.load_verify_locations(cadata=b"broken")
1359
1360
Paul Monsonf3550692019-06-19 13:09:54 -07001361 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001362 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001363 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001364 ctx.load_dh_params(DHFILE)
1365 if os.name != 'nt':
1366 ctx.load_dh_params(BYTES_DHFILE)
1367 self.assertRaises(TypeError, ctx.load_dh_params)
1368 self.assertRaises(TypeError, ctx.load_dh_params, None)
1369 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001370 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001371 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001372 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001373 ctx.load_dh_params(CERTFILE)
1374
Antoine Pitroub0182c82010-10-12 20:09:02 +00001375 def test_session_stats(self):
1376 for proto in PROTOCOLS:
1377 ctx = ssl.SSLContext(proto)
1378 self.assertEqual(ctx.session_stats(), {
1379 'number': 0,
1380 'connect': 0,
1381 'connect_good': 0,
1382 'connect_renegotiate': 0,
1383 'accept': 0,
1384 'accept_good': 0,
1385 'accept_renegotiate': 0,
1386 'hits': 0,
1387 'misses': 0,
1388 'timeouts': 0,
1389 'cache_full': 0,
1390 })
1391
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001392 def test_set_default_verify_paths(self):
1393 # There's not much we can do to test that it acts as expected,
1394 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001395 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001396 ctx.set_default_verify_paths()
1397
Antoine Pitrou501da612011-12-21 09:27:41 +01001398 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001399 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001400 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001401 ctx.set_ecdh_curve("prime256v1")
1402 ctx.set_ecdh_curve(b"prime256v1")
1403 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1404 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1405 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1406 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1407
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001408 @needs_sni
1409 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001410 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001411
1412 # set_servername_callback expects a callable, or None
1413 self.assertRaises(TypeError, ctx.set_servername_callback)
1414 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1415 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1416 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1417
1418 def dummycallback(sock, servername, ctx):
1419 pass
1420 ctx.set_servername_callback(None)
1421 ctx.set_servername_callback(dummycallback)
1422
1423 @needs_sni
1424 def test_sni_callback_refcycle(self):
1425 # Reference cycles through the servername callback are detected
1426 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001427 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001428 def dummycallback(sock, servername, ctx, cycle=ctx):
1429 pass
1430 ctx.set_servername_callback(dummycallback)
1431 wr = weakref.ref(ctx)
1432 del ctx, dummycallback
1433 gc.collect()
1434 self.assertIs(wr(), None)
1435
Christian Heimes9a5395a2013-06-17 15:44:12 +02001436 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001437 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001438 self.assertEqual(ctx.cert_store_stats(),
1439 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1440 ctx.load_cert_chain(CERTFILE)
1441 self.assertEqual(ctx.cert_store_stats(),
1442 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1443 ctx.load_verify_locations(CERTFILE)
1444 self.assertEqual(ctx.cert_store_stats(),
1445 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001446 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001447 self.assertEqual(ctx.cert_store_stats(),
1448 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1449
1450 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001451 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001452 self.assertEqual(ctx.get_ca_certs(), [])
1453 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1454 ctx.load_verify_locations(CERTFILE)
1455 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001456 # but CAFILE_CACERT is a CA cert
1457 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001458 self.assertEqual(ctx.get_ca_certs(),
1459 [{'issuer': ((('organizationName', 'Root CA'),),
1460 (('organizationalUnitName', 'http://www.cacert.org'),),
1461 (('commonName', 'CA Cert Signing Authority'),),
1462 (('emailAddress', 'support@cacert.org'),)),
1463 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1464 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1465 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001466 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001467 'subject': ((('organizationName', 'Root CA'),),
1468 (('organizationalUnitName', 'http://www.cacert.org'),),
1469 (('commonName', 'CA Cert Signing Authority'),),
1470 (('emailAddress', 'support@cacert.org'),)),
1471 'version': 3}])
1472
Martin Panterb55f8b72016-01-14 12:53:56 +00001473 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001474 pem = f.read()
1475 der = ssl.PEM_cert_to_DER_cert(pem)
1476 self.assertEqual(ctx.get_ca_certs(True), [der])
1477
Christian Heimes72d28502013-11-23 13:56:58 +01001478 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001479 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001480 ctx.load_default_certs()
1481
Christian Heimesa170fa12017-09-15 20:27:30 +02001482 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001483 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1484 ctx.load_default_certs()
1485
Christian Heimesa170fa12017-09-15 20:27:30 +02001486 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001487 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1488
Christian Heimesa170fa12017-09-15 20:27:30 +02001489 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001490 self.assertRaises(TypeError, ctx.load_default_certs, None)
1491 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1492
Benjamin Peterson91244e02014-10-03 18:17:15 -04001493 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001494 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001495 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001496 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001497 with support.EnvironmentVarGuard() as env:
1498 env["SSL_CERT_DIR"] = CAPATH
1499 env["SSL_CERT_FILE"] = CERTFILE
1500 ctx.load_default_certs()
1501 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1502
Benjamin Peterson91244e02014-10-03 18:17:15 -04001503 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001504 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001505 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001506 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001507 ctx.load_default_certs()
1508 stats = ctx.cert_store_stats()
1509
Christian Heimesa170fa12017-09-15 20:27:30 +02001510 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001511 with support.EnvironmentVarGuard() as env:
1512 env["SSL_CERT_DIR"] = CAPATH
1513 env["SSL_CERT_FILE"] = CERTFILE
1514 ctx.load_default_certs()
1515 stats["x509"] += 1
1516 self.assertEqual(ctx.cert_store_stats(), stats)
1517
Christian Heimes358cfd42016-09-10 22:43:48 +02001518 def _assert_context_options(self, ctx):
1519 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1520 if OP_NO_COMPRESSION != 0:
1521 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1522 OP_NO_COMPRESSION)
1523 if OP_SINGLE_DH_USE != 0:
1524 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1525 OP_SINGLE_DH_USE)
1526 if OP_SINGLE_ECDH_USE != 0:
1527 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1528 OP_SINGLE_ECDH_USE)
1529 if OP_CIPHER_SERVER_PREFERENCE != 0:
1530 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1531 OP_CIPHER_SERVER_PREFERENCE)
1532
Christian Heimes4c05b472013-11-23 15:58:30 +01001533 def test_create_default_context(self):
1534 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001535
Christian Heimesa170fa12017-09-15 20:27:30 +02001536 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001537 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001538 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001539 self._assert_context_options(ctx)
1540
Christian Heimes4c05b472013-11-23 15:58:30 +01001541 with open(SIGNING_CA) as f:
1542 cadata = f.read()
1543 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1544 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001545 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001546 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001547 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001548
1549 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001550 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001551 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001552 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001553
Christian Heimes67986f92013-11-23 22:43:47 +01001554 def test__create_stdlib_context(self):
1555 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001556 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001557 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001558 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001559 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001560
1561 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1562 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1563 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001564 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001565
1566 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001567 cert_reqs=ssl.CERT_REQUIRED,
1568 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001569 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1570 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001571 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001572 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001573
1574 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001575 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001576 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001577 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001578
Christian Heimes1aa9a752013-12-02 02:41:19 +01001579 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001580 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001581 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001582 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001583
Christian Heimese82c0342017-09-15 20:29:57 +02001584 # Auto set CERT_REQUIRED
1585 ctx.check_hostname = True
1586 self.assertTrue(ctx.check_hostname)
1587 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1588 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001589 ctx.verify_mode = ssl.CERT_REQUIRED
1590 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001591 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001592
Christian Heimese82c0342017-09-15 20:29:57 +02001593 # Changing verify_mode does not affect check_hostname
1594 ctx.check_hostname = False
1595 ctx.verify_mode = ssl.CERT_NONE
1596 ctx.check_hostname = False
1597 self.assertFalse(ctx.check_hostname)
1598 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1599 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001600 ctx.check_hostname = True
1601 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001602 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1603
1604 ctx.check_hostname = False
1605 ctx.verify_mode = ssl.CERT_OPTIONAL
1606 ctx.check_hostname = False
1607 self.assertFalse(ctx.check_hostname)
1608 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1609 # keep CERT_OPTIONAL
1610 ctx.check_hostname = True
1611 self.assertTrue(ctx.check_hostname)
1612 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001613
1614 # Cannot set CERT_NONE with check_hostname enabled
1615 with self.assertRaises(ValueError):
1616 ctx.verify_mode = ssl.CERT_NONE
1617 ctx.check_hostname = False
1618 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001619 ctx.verify_mode = ssl.CERT_NONE
1620 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001621
Christian Heimes5fe668c2016-09-12 00:01:11 +02001622 def test_context_client_server(self):
1623 # PROTOCOL_TLS_CLIENT has sane defaults
1624 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1625 self.assertTrue(ctx.check_hostname)
1626 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1627
1628 # PROTOCOL_TLS_SERVER has different but also sane defaults
1629 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1630 self.assertFalse(ctx.check_hostname)
1631 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1632
Christian Heimes4df60f12017-09-15 20:26:05 +02001633 def test_context_custom_class(self):
1634 class MySSLSocket(ssl.SSLSocket):
1635 pass
1636
1637 class MySSLObject(ssl.SSLObject):
1638 pass
1639
1640 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1641 ctx.sslsocket_class = MySSLSocket
1642 ctx.sslobject_class = MySSLObject
1643
1644 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1645 self.assertIsInstance(sock, MySSLSocket)
1646 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1647 self.assertIsInstance(obj, MySSLObject)
1648
Christian Heimes78c7d522019-06-03 21:00:10 +02001649 @unittest.skipUnless(IS_OPENSSL_1_1_1, "Test requires OpenSSL 1.1.1")
1650 def test_num_tickest(self):
1651 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1652 self.assertEqual(ctx.num_tickets, 2)
1653 ctx.num_tickets = 1
1654 self.assertEqual(ctx.num_tickets, 1)
1655 ctx.num_tickets = 0
1656 self.assertEqual(ctx.num_tickets, 0)
1657 with self.assertRaises(ValueError):
1658 ctx.num_tickets = -1
1659 with self.assertRaises(TypeError):
1660 ctx.num_tickets = None
1661
1662 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1663 self.assertEqual(ctx.num_tickets, 2)
1664 with self.assertRaises(ValueError):
1665 ctx.num_tickets = 1
1666
Antoine Pitrou152efa22010-05-16 18:19:27 +00001667
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001668class SSLErrorTests(unittest.TestCase):
1669
1670 def test_str(self):
1671 # The str() of a SSLError doesn't include the errno
1672 e = ssl.SSLError(1, "foo")
1673 self.assertEqual(str(e), "foo")
1674 self.assertEqual(e.errno, 1)
1675 # Same for a subclass
1676 e = ssl.SSLZeroReturnError(1, "foo")
1677 self.assertEqual(str(e), "foo")
1678 self.assertEqual(e.errno, 1)
1679
Paul Monsonf3550692019-06-19 13:09:54 -07001680 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001681 def test_lib_reason(self):
1682 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001683 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001684 with self.assertRaises(ssl.SSLError) as cm:
1685 ctx.load_dh_params(CERTFILE)
1686 self.assertEqual(cm.exception.library, 'PEM')
1687 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1688 s = str(cm.exception)
1689 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1690
1691 def test_subclass(self):
1692 # Check that the appropriate SSLError subclass is raised
1693 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001694 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1695 ctx.check_hostname = False
1696 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001697 with socket.create_server(("127.0.0.1", 0)) as s:
1698 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001699 c.setblocking(False)
1700 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001701 with self.assertRaises(ssl.SSLWantReadError) as cm:
1702 c.do_handshake()
1703 s = str(cm.exception)
1704 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1705 # For compatibility
1706 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1707
1708
Christian Heimes61d478c2018-01-27 15:51:38 +01001709 def test_bad_server_hostname(self):
1710 ctx = ssl.create_default_context()
1711 with self.assertRaises(ValueError):
1712 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1713 server_hostname="")
1714 with self.assertRaises(ValueError):
1715 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1716 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001717 with self.assertRaises(TypeError):
1718 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1719 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001720
1721
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001722class MemoryBIOTests(unittest.TestCase):
1723
1724 def test_read_write(self):
1725 bio = ssl.MemoryBIO()
1726 bio.write(b'foo')
1727 self.assertEqual(bio.read(), b'foo')
1728 self.assertEqual(bio.read(), b'')
1729 bio.write(b'foo')
1730 bio.write(b'bar')
1731 self.assertEqual(bio.read(), b'foobar')
1732 self.assertEqual(bio.read(), b'')
1733 bio.write(b'baz')
1734 self.assertEqual(bio.read(2), b'ba')
1735 self.assertEqual(bio.read(1), b'z')
1736 self.assertEqual(bio.read(1), b'')
1737
1738 def test_eof(self):
1739 bio = ssl.MemoryBIO()
1740 self.assertFalse(bio.eof)
1741 self.assertEqual(bio.read(), b'')
1742 self.assertFalse(bio.eof)
1743 bio.write(b'foo')
1744 self.assertFalse(bio.eof)
1745 bio.write_eof()
1746 self.assertFalse(bio.eof)
1747 self.assertEqual(bio.read(2), b'fo')
1748 self.assertFalse(bio.eof)
1749 self.assertEqual(bio.read(1), b'o')
1750 self.assertTrue(bio.eof)
1751 self.assertEqual(bio.read(), b'')
1752 self.assertTrue(bio.eof)
1753
1754 def test_pending(self):
1755 bio = ssl.MemoryBIO()
1756 self.assertEqual(bio.pending, 0)
1757 bio.write(b'foo')
1758 self.assertEqual(bio.pending, 3)
1759 for i in range(3):
1760 bio.read(1)
1761 self.assertEqual(bio.pending, 3-i-1)
1762 for i in range(3):
1763 bio.write(b'x')
1764 self.assertEqual(bio.pending, i+1)
1765 bio.read()
1766 self.assertEqual(bio.pending, 0)
1767
1768 def test_buffer_types(self):
1769 bio = ssl.MemoryBIO()
1770 bio.write(b'foo')
1771 self.assertEqual(bio.read(), b'foo')
1772 bio.write(bytearray(b'bar'))
1773 self.assertEqual(bio.read(), b'bar')
1774 bio.write(memoryview(b'baz'))
1775 self.assertEqual(bio.read(), b'baz')
1776
1777 def test_error_types(self):
1778 bio = ssl.MemoryBIO()
1779 self.assertRaises(TypeError, bio.write, 'foo')
1780 self.assertRaises(TypeError, bio.write, None)
1781 self.assertRaises(TypeError, bio.write, True)
1782 self.assertRaises(TypeError, bio.write, 1)
1783
1784
Christian Heimes9d50ab52018-02-27 10:17:30 +01001785class SSLObjectTests(unittest.TestCase):
1786 def test_private_init(self):
1787 bio = ssl.MemoryBIO()
1788 with self.assertRaisesRegex(TypeError, "public constructor"):
1789 ssl.SSLObject(bio, bio)
1790
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001791 def test_unwrap(self):
1792 client_ctx, server_ctx, hostname = testing_context()
1793 c_in = ssl.MemoryBIO()
1794 c_out = ssl.MemoryBIO()
1795 s_in = ssl.MemoryBIO()
1796 s_out = ssl.MemoryBIO()
1797 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1798 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1799
1800 # Loop on the handshake for a bit to get it settled
1801 for _ in range(5):
1802 try:
1803 client.do_handshake()
1804 except ssl.SSLWantReadError:
1805 pass
1806 if c_out.pending:
1807 s_in.write(c_out.read())
1808 try:
1809 server.do_handshake()
1810 except ssl.SSLWantReadError:
1811 pass
1812 if s_out.pending:
1813 c_in.write(s_out.read())
1814 # Now the handshakes should be complete (don't raise WantReadError)
1815 client.do_handshake()
1816 server.do_handshake()
1817
1818 # Now if we unwrap one side unilaterally, it should send close-notify
1819 # and raise WantReadError:
1820 with self.assertRaises(ssl.SSLWantReadError):
1821 client.unwrap()
1822
1823 # But server.unwrap() does not raise, because it reads the client's
1824 # close-notify:
1825 s_in.write(c_out.read())
1826 server.unwrap()
1827
1828 # And now that the client gets the server's close-notify, it doesn't
1829 # raise either.
1830 c_in.write(s_out.read())
1831 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001832
Martin Panter3840b2a2016-03-27 01:53:46 +00001833class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001834 """Tests that connect to a simple server running in the background"""
1835
1836 def setUp(self):
1837 server = ThreadedEchoServer(SIGNED_CERTFILE)
1838 self.server_addr = (HOST, server.port)
1839 server.__enter__()
1840 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001841
Antoine Pitrou480a1242010-04-28 21:37:09 +00001842 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001843 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001844 cert_reqs=ssl.CERT_NONE) as s:
1845 s.connect(self.server_addr)
1846 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001847 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001848
Martin Panter3840b2a2016-03-27 01:53:46 +00001849 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001850 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001851 cert_reqs=ssl.CERT_REQUIRED,
1852 ca_certs=SIGNING_CA) as s:
1853 s.connect(self.server_addr)
1854 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001855 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856
Martin Panter3840b2a2016-03-27 01:53:46 +00001857 def test_connect_fail(self):
1858 # This should fail because we have no verification certs. Connection
1859 # failure crashes ThreadedEchoServer, so run this in an independent
1860 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001861 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001862 cert_reqs=ssl.CERT_REQUIRED)
1863 self.addCleanup(s.close)
1864 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1865 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001866
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001867 def test_connect_ex(self):
1868 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001869 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001870 cert_reqs=ssl.CERT_REQUIRED,
1871 ca_certs=SIGNING_CA)
1872 self.addCleanup(s.close)
1873 self.assertEqual(0, s.connect_ex(self.server_addr))
1874 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001875
1876 def test_non_blocking_connect_ex(self):
1877 # Issue #11326: non-blocking connect_ex() should allow handshake
1878 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001879 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001880 cert_reqs=ssl.CERT_REQUIRED,
1881 ca_certs=SIGNING_CA,
1882 do_handshake_on_connect=False)
1883 self.addCleanup(s.close)
1884 s.setblocking(False)
1885 rc = s.connect_ex(self.server_addr)
1886 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1887 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1888 # Wait for connect to finish
1889 select.select([], [s], [], 5.0)
1890 # Non-blocking handshake
1891 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001892 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001893 s.do_handshake()
1894 break
1895 except ssl.SSLWantReadError:
1896 select.select([s], [], [], 5.0)
1897 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001898 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001899 # SSL established
1900 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001901
Antoine Pitrou152efa22010-05-16 18:19:27 +00001902 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001903 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001904 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001905 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1906 s.connect(self.server_addr)
1907 self.assertEqual({}, s.getpeercert())
1908 # Same with a server hostname
1909 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1910 server_hostname="dummy") as s:
1911 s.connect(self.server_addr)
1912 ctx.verify_mode = ssl.CERT_REQUIRED
1913 # This should succeed because we specify the root cert
1914 ctx.load_verify_locations(SIGNING_CA)
1915 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1916 s.connect(self.server_addr)
1917 cert = s.getpeercert()
1918 self.assertTrue(cert)
1919
1920 def test_connect_with_context_fail(self):
1921 # This should fail because we have no verification certs. Connection
1922 # failure crashes ThreadedEchoServer, so run this in an independent
1923 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001924 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001925 ctx.verify_mode = ssl.CERT_REQUIRED
1926 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1927 self.addCleanup(s.close)
1928 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1929 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001930
1931 def test_connect_capath(self):
1932 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001933 # NOTE: the subject hashing algorithm has been changed between
1934 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1935 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001936 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001937 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001938 ctx.verify_mode = ssl.CERT_REQUIRED
1939 ctx.load_verify_locations(capath=CAPATH)
1940 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1941 s.connect(self.server_addr)
1942 cert = s.getpeercert()
1943 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001944
Martin Panter3840b2a2016-03-27 01:53:46 +00001945 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001946 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001947 ctx.verify_mode = ssl.CERT_REQUIRED
1948 ctx.load_verify_locations(capath=BYTES_CAPATH)
1949 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1950 s.connect(self.server_addr)
1951 cert = s.getpeercert()
1952 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001953
Christian Heimesefff7062013-11-21 03:35:02 +01001954 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001955 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001956 pem = f.read()
1957 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001958 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001959 ctx.verify_mode = ssl.CERT_REQUIRED
1960 ctx.load_verify_locations(cadata=pem)
1961 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1962 s.connect(self.server_addr)
1963 cert = s.getpeercert()
1964 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001965
Martin Panter3840b2a2016-03-27 01:53:46 +00001966 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001967 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001968 ctx.verify_mode = ssl.CERT_REQUIRED
1969 ctx.load_verify_locations(cadata=der)
1970 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1971 s.connect(self.server_addr)
1972 cert = s.getpeercert()
1973 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001974
Antoine Pitroue3220242010-04-24 11:13:53 +00001975 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1976 def test_makefile_close(self):
1977 # Issue #5238: creating a file-like object with makefile() shouldn't
1978 # delay closing the underlying "real socket" (here tested with its
1979 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001980 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001981 ss.connect(self.server_addr)
1982 fd = ss.fileno()
1983 f = ss.makefile()
1984 f.close()
1985 # The fd is still open
1986 os.read(fd, 0)
1987 # Closing the SSL socket should close the fd too
1988 ss.close()
1989 gc.collect()
1990 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001991 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001992 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001993
Antoine Pitrou480a1242010-04-28 21:37:09 +00001994 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001995 s = socket.socket(socket.AF_INET)
1996 s.connect(self.server_addr)
1997 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001998 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001999 cert_reqs=ssl.CERT_NONE,
2000 do_handshake_on_connect=False)
2001 self.addCleanup(s.close)
2002 count = 0
2003 while True:
2004 try:
2005 count += 1
2006 s.do_handshake()
2007 break
2008 except ssl.SSLWantReadError:
2009 select.select([s], [], [])
2010 except ssl.SSLWantWriteError:
2011 select.select([], [s], [])
2012 if support.verbose:
2013 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002014
Antoine Pitrou480a1242010-04-28 21:37:09 +00002015 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00002016 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02002017
Martin Panter3840b2a2016-03-27 01:53:46 +00002018 def test_get_server_certificate_fail(self):
2019 # Connection failure crashes ThreadedEchoServer, so run this in an
2020 # independent test method
2021 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002022
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00002023 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02002024 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002025 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
2026 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02002027 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002028 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2029 s.connect(self.server_addr)
2030 # Error checking can happen at instantiation or when connecting
2031 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2032 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002033 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002034 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2035 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002036
Christian Heimes9a5395a2013-06-17 15:44:12 +02002037 def test_get_ca_certs_capath(self):
2038 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002039 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002040 ctx.load_verify_locations(capath=CAPATH)
2041 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002042 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2043 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002044 s.connect(self.server_addr)
2045 cert = s.getpeercert()
2046 self.assertTrue(cert)
2047 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002048
Christian Heimes575596e2013-12-15 21:49:17 +01002049 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002050 def test_context_setget(self):
2051 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002052 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2053 ctx1.load_verify_locations(capath=CAPATH)
2054 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2055 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002056 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002057 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002058 ss.connect(self.server_addr)
2059 self.assertIs(ss.context, ctx1)
2060 self.assertIs(ss._sslobj.context, ctx1)
2061 ss.context = ctx2
2062 self.assertIs(ss.context, ctx2)
2063 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002064
2065 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2066 # A simple IO loop. Call func(*args) depending on the error we get
2067 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2068 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002069 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002070 count = 0
2071 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002072 if time.monotonic() > deadline:
2073 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002074 errno = None
2075 count += 1
2076 try:
2077 ret = func(*args)
2078 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002079 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002080 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002081 raise
2082 errno = e.errno
2083 # Get any data from the outgoing BIO irrespective of any error, and
2084 # send it to the socket.
2085 buf = outgoing.read()
2086 sock.sendall(buf)
2087 # If there's no error, we're done. For WANT_READ, we need to get
2088 # data from the socket and put it in the incoming BIO.
2089 if errno is None:
2090 break
2091 elif errno == ssl.SSL_ERROR_WANT_READ:
2092 buf = sock.recv(32768)
2093 if buf:
2094 incoming.write(buf)
2095 else:
2096 incoming.write_eof()
2097 if support.verbose:
2098 sys.stdout.write("Needed %d calls to complete %s().\n"
2099 % (count, func.__name__))
2100 return ret
2101
Martin Panter3840b2a2016-03-27 01:53:46 +00002102 def test_bio_handshake(self):
2103 sock = socket.socket(socket.AF_INET)
2104 self.addCleanup(sock.close)
2105 sock.connect(self.server_addr)
2106 incoming = ssl.MemoryBIO()
2107 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002108 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2109 self.assertTrue(ctx.check_hostname)
2110 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002111 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002112 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2113 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002114 self.assertIs(sslobj._sslobj.owner, sslobj)
2115 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002116 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002117 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002118 self.assertRaises(ValueError, sslobj.getpeercert)
2119 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2120 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2121 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2122 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002123 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002124 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002125 self.assertTrue(sslobj.getpeercert())
2126 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2127 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2128 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002129 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002130 except ssl.SSLSyscallError:
2131 # If the server shuts down the TCP connection without sending a
2132 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2133 pass
2134 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2135
2136 def test_bio_read_write_data(self):
2137 sock = socket.socket(socket.AF_INET)
2138 self.addCleanup(sock.close)
2139 sock.connect(self.server_addr)
2140 incoming = ssl.MemoryBIO()
2141 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002142 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002143 ctx.verify_mode = ssl.CERT_NONE
2144 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2145 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2146 req = b'FOO\n'
2147 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2148 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2149 self.assertEqual(buf, b'foo\n')
2150 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002151
2152
Martin Panter3840b2a2016-03-27 01:53:46 +00002153class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002154
Martin Panter3840b2a2016-03-27 01:53:46 +00002155 def test_timeout_connect_ex(self):
2156 # Issue #12065: on a timeout, connect_ex() should return the original
2157 # errno (mimicking the behaviour of non-SSL sockets).
2158 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002159 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002160 cert_reqs=ssl.CERT_REQUIRED,
2161 do_handshake_on_connect=False)
2162 self.addCleanup(s.close)
2163 s.settimeout(0.0000001)
2164 rc = s.connect_ex((REMOTE_HOST, 443))
2165 if rc == 0:
2166 self.skipTest("REMOTE_HOST responded too quickly")
2167 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2168
2169 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2170 def test_get_server_certificate_ipv6(self):
2171 with support.transient_internet('ipv6.google.com'):
2172 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2173 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2174
Martin Panter3840b2a2016-03-27 01:53:46 +00002175
2176def _test_get_server_certificate(test, host, port, cert=None):
2177 pem = ssl.get_server_certificate((host, port))
2178 if not pem:
2179 test.fail("No server certificate on %s:%s!" % (host, port))
2180
2181 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2182 if not pem:
2183 test.fail("No server certificate on %s:%s!" % (host, port))
2184 if support.verbose:
2185 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2186
2187def _test_get_server_certificate_fail(test, host, port):
2188 try:
2189 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2190 except ssl.SSLError as x:
2191 #should fail
2192 if support.verbose:
2193 sys.stdout.write("%s\n" % x)
2194 else:
2195 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2196
2197
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002198from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002199
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002200class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002201
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002202 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002203
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002204 """A mildly complicated class, because we want it to work both
2205 with and without the SSL wrapper around the socket connection, so
2206 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002207
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002208 def __init__(self, server, connsock, addr):
2209 self.server = server
2210 self.running = False
2211 self.sock = connsock
2212 self.addr = addr
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03002213 self.sock.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002214 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002215 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002216 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002217
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002218 def wrap_conn(self):
2219 try:
2220 self.sslconn = self.server.context.wrap_socket(
2221 self.sock, server_side=True)
2222 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2223 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002224 except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002225 # We treat ConnectionResetError as though it were an
2226 # SSLError - OpenSSL on Ubuntu abruptly closes the
2227 # connection when asked to use an unsupported protocol.
2228 #
Christian Heimes529525f2018-05-23 22:24:45 +02002229 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2230 # tries to send session tickets after handshake.
2231 # https://github.com/openssl/openssl/issues/6342
Paul Monsonfb7e7502019-05-15 15:38:55 -07002232 #
2233 # ConnectionAbortedError is raised in TLS 1.3 mode, when OpenSSL
2234 # tries to send session tickets after handshake when using WinSock.
Christian Heimes529525f2018-05-23 22:24:45 +02002235 self.server.conn_errors.append(str(e))
2236 if self.server.chatty:
2237 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2238 self.running = False
2239 self.close()
2240 return False
2241 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002242 # OSError may occur with wrong protocols, e.g. both
2243 # sides use PROTOCOL_TLS_SERVER.
2244 #
2245 # XXX Various errors can have happened here, for example
2246 # a mismatching protocol version, an invalid certificate,
2247 # or a low-level bug. This should be made more discriminating.
2248 #
2249 # bpo-31323: Store the exception as string to prevent
2250 # a reference leak: server -> conn_errors -> exception
2251 # -> traceback -> self (ConnectionHandler) -> server
2252 self.server.conn_errors.append(str(e))
2253 if self.server.chatty:
2254 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2255 self.running = False
2256 self.server.stop()
2257 self.close()
2258 return False
2259 else:
2260 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2261 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2262 cert = self.sslconn.getpeercert()
2263 if support.verbose and self.server.chatty:
2264 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2265 cert_binary = self.sslconn.getpeercert(True)
2266 if support.verbose and self.server.chatty:
2267 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2268 cipher = self.sslconn.cipher()
2269 if support.verbose and self.server.chatty:
2270 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2271 sys.stdout.write(" server: selected protocol is now "
2272 + str(self.sslconn.selected_npn_protocol()) + "\n")
2273 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002274
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002275 def read(self):
2276 if self.sslconn:
2277 return self.sslconn.read()
2278 else:
2279 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002280
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002281 def write(self, bytes):
2282 if self.sslconn:
2283 return self.sslconn.write(bytes)
2284 else:
2285 return self.sock.send(bytes)
2286
2287 def close(self):
2288 if self.sslconn:
2289 self.sslconn.close()
2290 else:
2291 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002292
Antoine Pitrou480a1242010-04-28 21:37:09 +00002293 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002294 self.running = True
2295 if not self.server.starttls_server:
2296 if not self.wrap_conn():
2297 return
2298 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002299 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002300 msg = self.read()
2301 stripped = msg.strip()
2302 if not stripped:
2303 # eof, so quit this handler
2304 self.running = False
2305 try:
2306 self.sock = self.sslconn.unwrap()
2307 except OSError:
2308 # Many tests shut the TCP connection down
2309 # without an SSL shutdown. This causes
2310 # unwrap() to raise OSError with errno=0!
2311 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002312 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002313 self.sslconn = None
2314 self.close()
2315 elif stripped == b'over':
2316 if support.verbose and self.server.connectionchatty:
2317 sys.stdout.write(" server: client closed connection\n")
2318 self.close()
2319 return
2320 elif (self.server.starttls_server and
2321 stripped == b'STARTTLS'):
2322 if support.verbose and self.server.connectionchatty:
2323 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2324 self.write(b"OK\n")
2325 if not self.wrap_conn():
2326 return
2327 elif (self.server.starttls_server and self.sslconn
2328 and stripped == b'ENDTLS'):
2329 if support.verbose and self.server.connectionchatty:
2330 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2331 self.write(b"OK\n")
2332 self.sock = self.sslconn.unwrap()
2333 self.sslconn = None
2334 if support.verbose and self.server.connectionchatty:
2335 sys.stdout.write(" server: connection is now unencrypted...\n")
2336 elif stripped == b'CB tls-unique':
2337 if support.verbose and self.server.connectionchatty:
2338 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2339 data = self.sslconn.get_channel_binding("tls-unique")
2340 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002341 elif stripped == b'PHA':
2342 if support.verbose and self.server.connectionchatty:
2343 sys.stdout.write(" server: initiating post handshake auth\n")
2344 try:
2345 self.sslconn.verify_client_post_handshake()
2346 except ssl.SSLError as e:
2347 self.write(repr(e).encode("us-ascii") + b"\n")
2348 else:
2349 self.write(b"OK\n")
2350 elif stripped == b'HASCERT':
2351 if self.sslconn.getpeercert() is not None:
2352 self.write(b'TRUE\n')
2353 else:
2354 self.write(b'FALSE\n')
2355 elif stripped == b'GETCERT':
2356 cert = self.sslconn.getpeercert()
2357 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002358 else:
2359 if (support.verbose and
2360 self.server.connectionchatty):
2361 ctype = (self.sslconn and "encrypted") or "unencrypted"
2362 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2363 % (msg, ctype, msg.lower(), ctype))
2364 self.write(msg.lower())
Paul Monsonfb7e7502019-05-15 15:38:55 -07002365 except (ConnectionResetError, ConnectionAbortedError):
Christian Heimes529525f2018-05-23 22:24:45 +02002366 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2367 # when connection is not shut down gracefully.
2368 if self.server.chatty and support.verbose:
2369 sys.stdout.write(
2370 " Connection reset by peer: {}\n".format(
2371 self.addr)
2372 )
2373 self.close()
2374 self.running = False
Paul Monsonfb7e7502019-05-15 15:38:55 -07002375 except ssl.SSLError as err:
2376 # On Windows sometimes test_pha_required_nocert receives the
2377 # PEER_DID_NOT_RETURN_A_CERTIFICATE exception
2378 # before the 'tlsv13 alert certificate required' exception.
2379 # If the server is stopped when PEER_DID_NOT_RETURN_A_CERTIFICATE
2380 # is received test_pha_required_nocert fails with ConnectionResetError
2381 # because the underlying socket is closed
2382 if 'PEER_DID_NOT_RETURN_A_CERTIFICATE' == err.reason:
2383 if self.server.chatty and support.verbose:
2384 sys.stdout.write(err.args[1])
2385 # test_pha_required_nocert is expecting this exception
2386 raise ssl.SSLError('tlsv13 alert certificate required')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002387 except OSError:
2388 if self.server.chatty:
2389 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002390 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002391 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002392
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002393 # normally, we'd just stop here, but for the test
2394 # harness, we want to stop the server
2395 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002396
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002397 def __init__(self, certificate=None, ssl_version=None,
2398 certreqs=None, cacerts=None,
2399 chatty=True, connectionchatty=False, starttls_server=False,
2400 npn_protocols=None, alpn_protocols=None,
2401 ciphers=None, context=None):
2402 if context:
2403 self.context = context
2404 else:
2405 self.context = ssl.SSLContext(ssl_version
2406 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002407 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002408 self.context.verify_mode = (certreqs if certreqs is not None
2409 else ssl.CERT_NONE)
2410 if cacerts:
2411 self.context.load_verify_locations(cacerts)
2412 if certificate:
2413 self.context.load_cert_chain(certificate)
2414 if npn_protocols:
2415 self.context.set_npn_protocols(npn_protocols)
2416 if alpn_protocols:
2417 self.context.set_alpn_protocols(alpn_protocols)
2418 if ciphers:
2419 self.context.set_ciphers(ciphers)
2420 self.chatty = chatty
2421 self.connectionchatty = connectionchatty
2422 self.starttls_server = starttls_server
2423 self.sock = socket.socket()
2424 self.port = support.bind_port(self.sock)
2425 self.flag = None
2426 self.active = False
2427 self.selected_npn_protocols = []
2428 self.selected_alpn_protocols = []
2429 self.shared_ciphers = []
2430 self.conn_errors = []
2431 threading.Thread.__init__(self)
2432 self.daemon = True
2433
2434 def __enter__(self):
2435 self.start(threading.Event())
2436 self.flag.wait()
2437 return self
2438
2439 def __exit__(self, *args):
2440 self.stop()
2441 self.join()
2442
2443 def start(self, flag=None):
2444 self.flag = flag
2445 threading.Thread.start(self)
2446
2447 def run(self):
2448 self.sock.settimeout(0.05)
2449 self.sock.listen()
2450 self.active = True
2451 if self.flag:
2452 # signal an event
2453 self.flag.set()
2454 while self.active:
2455 try:
2456 newconn, connaddr = self.sock.accept()
2457 if support.verbose and self.chatty:
2458 sys.stdout.write(' server: new connection from '
2459 + repr(connaddr) + '\n')
2460 handler = self.ConnectionHandler(self, newconn, connaddr)
2461 handler.start()
2462 handler.join()
2463 except socket.timeout:
2464 pass
2465 except KeyboardInterrupt:
2466 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002467 except BaseException as e:
2468 if support.verbose and self.chatty:
2469 sys.stdout.write(
2470 ' connection handling failed: ' + repr(e) + '\n')
2471
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002472 self.sock.close()
2473
2474 def stop(self):
2475 self.active = False
2476
2477class AsyncoreEchoServer(threading.Thread):
2478
2479 # this one's based on asyncore.dispatcher
2480
2481 class EchoServer (asyncore.dispatcher):
2482
2483 class ConnectionHandler(asyncore.dispatcher_with_send):
2484
2485 def __init__(self, conn, certfile):
2486 self.socket = test_wrap_socket(conn, server_side=True,
2487 certfile=certfile,
2488 do_handshake_on_connect=False)
2489 asyncore.dispatcher_with_send.__init__(self, self.socket)
2490 self._ssl_accepting = True
2491 self._do_ssl_handshake()
2492
2493 def readable(self):
2494 if isinstance(self.socket, ssl.SSLSocket):
2495 while self.socket.pending() > 0:
2496 self.handle_read_event()
2497 return True
2498
2499 def _do_ssl_handshake(self):
2500 try:
2501 self.socket.do_handshake()
2502 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2503 return
2504 except ssl.SSLEOFError:
2505 return self.handle_close()
2506 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002507 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002508 except OSError as err:
2509 if err.args[0] == errno.ECONNABORTED:
2510 return self.handle_close()
2511 else:
2512 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002513
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002514 def handle_read(self):
2515 if self._ssl_accepting:
2516 self._do_ssl_handshake()
2517 else:
2518 data = self.recv(1024)
2519 if support.verbose:
2520 sys.stdout.write(" server: read %s from client\n" % repr(data))
2521 if not data:
2522 self.close()
2523 else:
2524 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002525
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002526 def handle_close(self):
2527 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002528 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002530
2531 def handle_error(self):
2532 raise
2533
Trent Nelson78520002008-04-10 20:54:35 +00002534 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002535 self.certfile = certfile
2536 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2537 self.port = support.bind_port(sock, '')
2538 asyncore.dispatcher.__init__(self, sock)
2539 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002540
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002541 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002542 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002543 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2544 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002545
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002546 def handle_error(self):
2547 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002548
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002549 def __init__(self, certfile):
2550 self.flag = None
2551 self.active = False
2552 self.server = self.EchoServer(certfile)
2553 self.port = self.server.port
2554 threading.Thread.__init__(self)
2555 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002556
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557 def __str__(self):
2558 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002559
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002560 def __enter__(self):
2561 self.start(threading.Event())
2562 self.flag.wait()
2563 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002564
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002565 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002566 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002567 sys.stdout.write(" cleanup: stopping server.\n")
2568 self.stop()
2569 if support.verbose:
2570 sys.stdout.write(" cleanup: joining server thread.\n")
2571 self.join()
2572 if support.verbose:
2573 sys.stdout.write(" cleanup: successfully joined.\n")
2574 # make sure that ConnectionHandler is removed from socket_map
2575 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002576
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002577 def start (self, flag=None):
2578 self.flag = flag
2579 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002580
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002581 def run(self):
2582 self.active = True
2583 if self.flag:
2584 self.flag.set()
2585 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002586 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002587 asyncore.loop(1)
2588 except:
2589 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002590
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002591 def stop(self):
2592 self.active = False
2593 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002594
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002595def server_params_test(client_context, server_context, indata=b"FOO\n",
2596 chatty=True, connectionchatty=False, sni_name=None,
2597 session=None):
2598 """
2599 Launch a server, connect a client to it and try various reads
2600 and writes.
2601 """
2602 stats = {}
2603 server = ThreadedEchoServer(context=server_context,
2604 chatty=chatty,
2605 connectionchatty=False)
2606 with server:
2607 with client_context.wrap_socket(socket.socket(),
2608 server_hostname=sni_name, session=session) as s:
2609 s.connect((HOST, server.port))
2610 for arg in [indata, bytearray(indata), memoryview(indata)]:
2611 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002612 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002613 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002614 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002615 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002616 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002617 if connectionchatty:
2618 if support.verbose:
2619 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002620 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002621 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002622 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2623 % (outdata[:20], len(outdata),
2624 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002625 s.write(b"over\n")
2626 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002627 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002628 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002629 stats.update({
2630 'compression': s.compression(),
2631 'cipher': s.cipher(),
2632 'peercert': s.getpeercert(),
2633 'client_alpn_protocol': s.selected_alpn_protocol(),
2634 'client_npn_protocol': s.selected_npn_protocol(),
2635 'version': s.version(),
2636 'session_reused': s.session_reused,
2637 'session': s.session,
2638 })
2639 s.close()
2640 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2641 stats['server_npn_protocols'] = server.selected_npn_protocols
2642 stats['server_shared_ciphers'] = server.shared_ciphers
2643 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002644
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002645def try_protocol_combo(server_protocol, client_protocol, expect_success,
2646 certsreqs=None, server_options=0, client_options=0):
2647 """
2648 Try to SSL-connect using *client_protocol* to *server_protocol*.
2649 If *expect_success* is true, assert that the connection succeeds,
2650 if it's false, assert that the connection fails.
2651 Also, if *expect_success* is a string, assert that it is the protocol
2652 version actually used by the connection.
2653 """
2654 if certsreqs is None:
2655 certsreqs = ssl.CERT_NONE
2656 certtype = {
2657 ssl.CERT_NONE: "CERT_NONE",
2658 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2659 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2660 }[certsreqs]
2661 if support.verbose:
2662 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2663 sys.stdout.write(formatstr %
2664 (ssl.get_protocol_name(client_protocol),
2665 ssl.get_protocol_name(server_protocol),
2666 certtype))
2667 client_context = ssl.SSLContext(client_protocol)
2668 client_context.options |= client_options
2669 server_context = ssl.SSLContext(server_protocol)
2670 server_context.options |= server_options
2671
Victor Stinner3ef63442019-02-19 18:06:03 +01002672 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2673 if (min_version is not None
2674 # SSLContext.minimum_version is only available on recent OpenSSL
2675 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2676 and hasattr(server_context, 'minimum_version')
2677 and server_protocol == ssl.PROTOCOL_TLS
2678 and server_context.minimum_version > min_version):
2679 # If OpenSSL configuration is strict and requires more recent TLS
2680 # version, we have to change the minimum to test old TLS versions.
2681 server_context.minimum_version = min_version
2682
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002683 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2684 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2685 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002686 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002687 client_context.set_ciphers("ALL")
2688
2689 for ctx in (client_context, server_context):
2690 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002691 ctx.load_cert_chain(SIGNED_CERTFILE)
2692 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002693 try:
2694 stats = server_params_test(client_context, server_context,
2695 chatty=False, connectionchatty=False)
2696 # Protocol mismatch can result in either an SSLError, or a
2697 # "Connection reset by peer" error.
2698 except ssl.SSLError:
2699 if expect_success:
2700 raise
2701 except OSError as e:
2702 if expect_success or e.errno != errno.ECONNRESET:
2703 raise
2704 else:
2705 if not expect_success:
2706 raise AssertionError(
2707 "Client protocol %s succeeded with server protocol %s!"
2708 % (ssl.get_protocol_name(client_protocol),
2709 ssl.get_protocol_name(server_protocol)))
2710 elif (expect_success is not True
2711 and expect_success != stats['version']):
2712 raise AssertionError("version mismatch: expected %r, got %r"
2713 % (expect_success, stats['version']))
2714
2715
2716class ThreadedTests(unittest.TestCase):
2717
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002718 def test_echo(self):
2719 """Basic test of an SSL client connecting to a server"""
2720 if support.verbose:
2721 sys.stdout.write("\n")
2722 for protocol in PROTOCOLS:
2723 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2724 continue
2725 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2726 context = ssl.SSLContext(protocol)
2727 context.load_cert_chain(CERTFILE)
2728 server_params_test(context, context,
2729 chatty=True, connectionchatty=True)
2730
Christian Heimesa170fa12017-09-15 20:27:30 +02002731 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002732
2733 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2734 server_params_test(client_context=client_context,
2735 server_context=server_context,
2736 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002737 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002738
2739 client_context.check_hostname = False
2740 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2741 with self.assertRaises(ssl.SSLError) as e:
2742 server_params_test(client_context=server_context,
2743 server_context=client_context,
2744 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002745 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002746 self.assertIn('called a function you should not call',
2747 str(e.exception))
2748
2749 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2750 with self.assertRaises(ssl.SSLError) as e:
2751 server_params_test(client_context=server_context,
2752 server_context=server_context,
2753 chatty=True, connectionchatty=True)
2754 self.assertIn('called a function you should not call',
2755 str(e.exception))
2756
2757 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2758 with self.assertRaises(ssl.SSLError) as e:
2759 server_params_test(client_context=server_context,
2760 server_context=client_context,
2761 chatty=True, connectionchatty=True)
2762 self.assertIn('called a function you should not call',
2763 str(e.exception))
2764
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002765 def test_getpeercert(self):
2766 if support.verbose:
2767 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002768
2769 client_context, server_context, hostname = testing_context()
2770 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002771 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002772 with client_context.wrap_socket(socket.socket(),
2773 do_handshake_on_connect=False,
2774 server_hostname=hostname) as s:
2775 s.connect((HOST, server.port))
2776 # getpeercert() raise ValueError while the handshake isn't
2777 # done.
2778 with self.assertRaises(ValueError):
2779 s.getpeercert()
2780 s.do_handshake()
2781 cert = s.getpeercert()
2782 self.assertTrue(cert, "Can't get peer certificate.")
2783 cipher = s.cipher()
2784 if support.verbose:
2785 sys.stdout.write(pprint.pformat(cert) + '\n')
2786 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2787 if 'subject' not in cert:
2788 self.fail("No subject field in certificate: %s." %
2789 pprint.pformat(cert))
2790 if ((('organizationName', 'Python Software Foundation'),)
2791 not in cert['subject']):
2792 self.fail(
2793 "Missing or invalid 'organizationName' field in certificate subject; "
2794 "should be 'Python Software Foundation'.")
2795 self.assertIn('notBefore', cert)
2796 self.assertIn('notAfter', cert)
2797 before = ssl.cert_time_to_seconds(cert['notBefore'])
2798 after = ssl.cert_time_to_seconds(cert['notAfter'])
2799 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002800
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002801 @unittest.skipUnless(have_verify_flags(),
2802 "verify_flags need OpenSSL > 0.9.8")
2803 def test_crl_check(self):
2804 if support.verbose:
2805 sys.stdout.write("\n")
2806
Christian Heimesa170fa12017-09-15 20:27:30 +02002807 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002808
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002809 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002810 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002811
2812 # VERIFY_DEFAULT should pass
2813 server = ThreadedEchoServer(context=server_context, chatty=True)
2814 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002815 with client_context.wrap_socket(socket.socket(),
2816 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002817 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002818 cert = s.getpeercert()
2819 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002820
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002821 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002822 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002823
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002824 server = ThreadedEchoServer(context=server_context, chatty=True)
2825 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002826 with client_context.wrap_socket(socket.socket(),
2827 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002828 with self.assertRaisesRegex(ssl.SSLError,
2829 "certificate verify failed"):
2830 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002831
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002832 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002833 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002834
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 server = ThreadedEchoServer(context=server_context, chatty=True)
2836 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002837 with client_context.wrap_socket(socket.socket(),
2838 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002839 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002840 cert = s.getpeercert()
2841 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002842
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002843 def test_check_hostname(self):
2844 if support.verbose:
2845 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002846
Christian Heimesa170fa12017-09-15 20:27:30 +02002847 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002848
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002849 # correct hostname should verify
2850 server = ThreadedEchoServer(context=server_context, chatty=True)
2851 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002852 with client_context.wrap_socket(socket.socket(),
2853 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002854 s.connect((HOST, server.port))
2855 cert = s.getpeercert()
2856 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002857
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002858 # incorrect hostname should raise an exception
2859 server = ThreadedEchoServer(context=server_context, chatty=True)
2860 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002861 with client_context.wrap_socket(socket.socket(),
2862 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002863 with self.assertRaisesRegex(
2864 ssl.CertificateError,
2865 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002866 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002867
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002868 # missing server_hostname arg should cause an exception, too
2869 server = ThreadedEchoServer(context=server_context, chatty=True)
2870 with server:
2871 with socket.socket() as s:
2872 with self.assertRaisesRegex(ValueError,
2873 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002874 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002875
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002876 def test_ecc_cert(self):
2877 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2878 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002879 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002880 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2881
2882 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2883 # load ECC cert
2884 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2885
2886 # correct hostname should verify
2887 server = ThreadedEchoServer(context=server_context, chatty=True)
2888 with server:
2889 with client_context.wrap_socket(socket.socket(),
2890 server_hostname=hostname) as s:
2891 s.connect((HOST, server.port))
2892 cert = s.getpeercert()
2893 self.assertTrue(cert, "Can't get peer certificate.")
2894 cipher = s.cipher()[0].split('-')
2895 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2896
2897 def test_dual_rsa_ecc(self):
2898 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2899 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002900 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2901 # algorithms.
2902 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002903 # only ECDSA certs
2904 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2905 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2906
2907 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2908 # load ECC and RSA key/cert pairs
2909 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2910 server_context.load_cert_chain(SIGNED_CERTFILE)
2911
2912 # correct hostname should verify
2913 server = ThreadedEchoServer(context=server_context, chatty=True)
2914 with server:
2915 with client_context.wrap_socket(socket.socket(),
2916 server_hostname=hostname) as s:
2917 s.connect((HOST, server.port))
2918 cert = s.getpeercert()
2919 self.assertTrue(cert, "Can't get peer certificate.")
2920 cipher = s.cipher()[0].split('-')
2921 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2922
Christian Heimes66e57422018-01-29 14:25:13 +01002923 def test_check_hostname_idn(self):
2924 if support.verbose:
2925 sys.stdout.write("\n")
2926
Christian Heimes11a14932018-02-24 02:35:08 +01002927 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002928 server_context.load_cert_chain(IDNSANSFILE)
2929
Christian Heimes11a14932018-02-24 02:35:08 +01002930 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002931 context.verify_mode = ssl.CERT_REQUIRED
2932 context.check_hostname = True
2933 context.load_verify_locations(SIGNING_CA)
2934
2935 # correct hostname should verify, when specified in several
2936 # different ways
2937 idn_hostnames = [
2938 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002939 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002940 ('xn--knig-5qa.idn.pythontest.net',
2941 'xn--knig-5qa.idn.pythontest.net'),
2942 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002943 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002944
2945 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002946 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002947 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2948 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2949 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002950 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2951
2952 # ('königsgäßchen.idna2008.pythontest.net',
2953 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2954 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2955 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2956 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2957 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2958
Christian Heimes66e57422018-01-29 14:25:13 +01002959 ]
2960 for server_hostname, expected_hostname in idn_hostnames:
2961 server = ThreadedEchoServer(context=server_context, chatty=True)
2962 with server:
2963 with context.wrap_socket(socket.socket(),
2964 server_hostname=server_hostname) as s:
2965 self.assertEqual(s.server_hostname, expected_hostname)
2966 s.connect((HOST, server.port))
2967 cert = s.getpeercert()
2968 self.assertEqual(s.server_hostname, expected_hostname)
2969 self.assertTrue(cert, "Can't get peer certificate.")
2970
Christian Heimes66e57422018-01-29 14:25:13 +01002971 # incorrect hostname should raise an exception
2972 server = ThreadedEchoServer(context=server_context, chatty=True)
2973 with server:
2974 with context.wrap_socket(socket.socket(),
2975 server_hostname="python.example.org") as s:
2976 with self.assertRaises(ssl.CertificateError):
2977 s.connect((HOST, server.port))
2978
Christian Heimes529525f2018-05-23 22:24:45 +02002979 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002980 """Connecting when the server rejects the client's certificate
2981
2982 Launch a server with CERT_REQUIRED, and check that trying to
2983 connect to it with a wrong client certificate fails.
2984 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002985 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002986 # load client cert that is not signed by trusted CA
2987 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002988 # require TLS client authentication
2989 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002990 # TLS 1.3 has different handshake
2991 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002992
2993 server = ThreadedEchoServer(
2994 context=server_context, chatty=True, connectionchatty=True,
2995 )
2996
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002997 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002998 client_context.wrap_socket(socket.socket(),
2999 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003000 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003001 # Expect either an SSL error about the server rejecting
3002 # the connection, or a low-level connection reset (which
3003 # sometimes happens on Windows)
3004 s.connect((HOST, server.port))
3005 except ssl.SSLError as e:
3006 if support.verbose:
3007 sys.stdout.write("\nSSLError is %r\n" % e)
3008 except OSError as e:
3009 if e.errno != errno.ECONNRESET:
3010 raise
3011 if support.verbose:
3012 sys.stdout.write("\nsocket.error is %r\n" % e)
3013 else:
3014 self.fail("Use of invalid cert should have failed!")
3015
Christian Heimes529525f2018-05-23 22:24:45 +02003016 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
3017 def test_wrong_cert_tls13(self):
3018 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02003019 # load client cert that is not signed by trusted CA
3020 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02003021 server_context.verify_mode = ssl.CERT_REQUIRED
3022 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
3023 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
3024
3025 server = ThreadedEchoServer(
3026 context=server_context, chatty=True, connectionchatty=True,
3027 )
3028 with server, \
3029 client_context.wrap_socket(socket.socket(),
3030 server_hostname=hostname) as s:
3031 # TLS 1.3 perform client cert exchange after handshake
3032 s.connect((HOST, server.port))
3033 try:
3034 s.write(b'data')
3035 s.read(4)
3036 except ssl.SSLError as e:
3037 if support.verbose:
3038 sys.stdout.write("\nSSLError is %r\n" % e)
3039 except OSError as e:
3040 if e.errno != errno.ECONNRESET:
3041 raise
3042 if support.verbose:
3043 sys.stdout.write("\nsocket.error is %r\n" % e)
3044 else:
3045 self.fail("Use of invalid cert should have failed!")
3046
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003047 def test_rude_shutdown(self):
3048 """A brutal shutdown of an SSL server should raise an OSError
3049 in the client when attempting handshake.
3050 """
3051 listener_ready = threading.Event()
3052 listener_gone = threading.Event()
3053
3054 s = socket.socket()
3055 port = support.bind_port(s, HOST)
3056
3057 # `listener` runs in a thread. It sits in an accept() until
3058 # the main thread connects. Then it rudely closes the socket,
3059 # and sets Event `listener_gone` to let the main thread know
3060 # the socket is gone.
3061 def listener():
3062 s.listen()
3063 listener_ready.set()
3064 newsock, addr = s.accept()
3065 newsock.close()
3066 s.close()
3067 listener_gone.set()
3068
3069 def connector():
3070 listener_ready.wait()
3071 with socket.socket() as c:
3072 c.connect((HOST, port))
3073 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003074 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003075 ssl_sock = test_wrap_socket(c)
3076 except OSError:
3077 pass
3078 else:
3079 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003080
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003081 t = threading.Thread(target=listener)
3082 t.start()
3083 try:
3084 connector()
3085 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003086 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003087
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003088 def test_ssl_cert_verify_error(self):
3089 if support.verbose:
3090 sys.stdout.write("\n")
3091
3092 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3093 server_context.load_cert_chain(SIGNED_CERTFILE)
3094
3095 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3096
3097 server = ThreadedEchoServer(context=server_context, chatty=True)
3098 with server:
3099 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003100 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003101 try:
3102 s.connect((HOST, server.port))
3103 except ssl.SSLError as e:
3104 msg = 'unable to get local issuer certificate'
3105 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3106 self.assertEqual(e.verify_code, 20)
3107 self.assertEqual(e.verify_message, msg)
3108 self.assertIn(msg, repr(e))
3109 self.assertIn('certificate verify failed', repr(e))
3110
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003111 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3112 "OpenSSL is compiled without SSLv2 support")
3113 def test_protocol_sslv2(self):
3114 """Connecting to an SSLv2 server with various client options"""
3115 if support.verbose:
3116 sys.stdout.write("\n")
3117 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3118 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3119 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003120 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003121 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3122 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3123 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3124 # SSLv23 client with specific SSL options
3125 if no_sslv2_implies_sslv3_hello():
3126 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003127 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003128 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003129 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003130 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003131 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003132 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003133
Christian Heimesa170fa12017-09-15 20:27:30 +02003134 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003135 """Connecting to an SSLv23 server with various client options"""
3136 if support.verbose:
3137 sys.stdout.write("\n")
3138 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003139 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003140 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003141 except OSError as x:
3142 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3143 if support.verbose:
3144 sys.stdout.write(
3145 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3146 % str(x))
3147 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003148 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3149 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3150 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003151
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003152 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003153 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3154 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3155 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003156
3157 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003158 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3159 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3160 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003161
3162 # Server with specific SSL options
3163 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003164 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003165 server_options=ssl.OP_NO_SSLv3)
3166 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003167 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003168 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003169 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003170 server_options=ssl.OP_NO_TLSv1)
3171
3172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003173 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3174 "OpenSSL is compiled without SSLv3 support")
3175 def test_protocol_sslv3(self):
3176 """Connecting to an SSLv3 server with various client options"""
3177 if support.verbose:
3178 sys.stdout.write("\n")
3179 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3180 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3181 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3182 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3183 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003184 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003185 client_options=ssl.OP_NO_SSLv3)
3186 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3187 if no_sslv2_implies_sslv3_hello():
3188 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003189 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003190 False, client_options=ssl.OP_NO_SSLv2)
3191
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003192 def test_protocol_tlsv1(self):
3193 """Connecting to a TLSv1 server with various client options"""
3194 if support.verbose:
3195 sys.stdout.write("\n")
3196 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3197 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3198 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3199 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3200 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3201 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3202 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003203 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003204 client_options=ssl.OP_NO_TLSv1)
3205
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003206 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3207 "TLS version 1.1 not supported.")
3208 def test_protocol_tlsv1_1(self):
3209 """Connecting to a TLSv1.1 server with various client options.
3210 Testing against older TLS versions."""
3211 if support.verbose:
3212 sys.stdout.write("\n")
3213 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3214 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3215 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3216 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3217 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003218 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 client_options=ssl.OP_NO_TLSv1_1)
3220
Christian Heimesa170fa12017-09-15 20:27:30 +02003221 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003222 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3223 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3224
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003225 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3226 "TLS version 1.2 not supported.")
3227 def test_protocol_tlsv1_2(self):
3228 """Connecting to a TLSv1.2 server with various client options.
3229 Testing against older TLS versions."""
3230 if support.verbose:
3231 sys.stdout.write("\n")
3232 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3233 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3234 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3235 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3236 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3237 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3238 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003239 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003240 client_options=ssl.OP_NO_TLSv1_2)
3241
Christian Heimesa170fa12017-09-15 20:27:30 +02003242 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003243 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3244 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3245 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3246 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3247
3248 def test_starttls(self):
3249 """Switching from clear text to encrypted and back again."""
3250 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3251
3252 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003253 starttls_server=True,
3254 chatty=True,
3255 connectionchatty=True)
3256 wrapped = False
3257 with server:
3258 s = socket.socket()
Serhiy Storchaka5eca7f32019-09-01 12:12:52 +03003259 s.setblocking(True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003260 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003261 if support.verbose:
3262 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003263 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003264 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003265 sys.stdout.write(
3266 " client: sending %r...\n" % indata)
3267 if wrapped:
3268 conn.write(indata)
3269 outdata = conn.read()
3270 else:
3271 s.send(indata)
3272 outdata = s.recv(1024)
3273 msg = outdata.strip().lower()
3274 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3275 # STARTTLS ok, switch to secure mode
3276 if support.verbose:
3277 sys.stdout.write(
3278 " client: read %r from server, starting TLS...\n"
3279 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003280 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003281 wrapped = True
3282 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3283 # ENDTLS ok, switch back to clear text
3284 if support.verbose:
3285 sys.stdout.write(
3286 " client: read %r from server, ending TLS...\n"
3287 % msg)
3288 s = conn.unwrap()
3289 wrapped = False
3290 else:
3291 if support.verbose:
3292 sys.stdout.write(
3293 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003294 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003295 sys.stdout.write(" client: closing connection.\n")
3296 if wrapped:
3297 conn.write(b"over\n")
3298 else:
3299 s.send(b"over\n")
3300 if wrapped:
3301 conn.close()
3302 else:
3303 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003304
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003305 def test_socketserver(self):
3306 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003307 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003308 # try to connect
3309 if support.verbose:
3310 sys.stdout.write('\n')
3311 with open(CERTFILE, 'rb') as f:
3312 d1 = f.read()
3313 d2 = ''
3314 # now fetch the same data from the HTTPS server
3315 url = 'https://localhost:%d/%s' % (
3316 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003317 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003318 f = urllib.request.urlopen(url, context=context)
3319 try:
3320 dlen = f.info().get("content-length")
3321 if dlen and (int(dlen) > 0):
3322 d2 = f.read(int(dlen))
3323 if support.verbose:
3324 sys.stdout.write(
3325 " client: read %d bytes from remote server '%s'\n"
3326 % (len(d2), server))
3327 finally:
3328 f.close()
3329 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003330
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003331 def test_asyncore_server(self):
3332 """Check the example asyncore integration."""
3333 if support.verbose:
3334 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003335
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003336 indata = b"FOO\n"
3337 server = AsyncoreEchoServer(CERTFILE)
3338 with server:
3339 s = test_wrap_socket(socket.socket())
3340 s.connect(('127.0.0.1', server.port))
3341 if support.verbose:
3342 sys.stdout.write(
3343 " client: sending %r...\n" % indata)
3344 s.write(indata)
3345 outdata = s.read()
3346 if support.verbose:
3347 sys.stdout.write(" client: read %r\n" % outdata)
3348 if outdata != indata.lower():
3349 self.fail(
3350 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3351 % (outdata[:20], len(outdata),
3352 indata[:20].lower(), len(indata)))
3353 s.write(b"over\n")
3354 if support.verbose:
3355 sys.stdout.write(" client: closing connection.\n")
3356 s.close()
3357 if support.verbose:
3358 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003359
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003360 def test_recv_send(self):
3361 """Test recv(), send() and friends."""
3362 if support.verbose:
3363 sys.stdout.write("\n")
3364
3365 server = ThreadedEchoServer(CERTFILE,
3366 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003367 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003368 cacerts=CERTFILE,
3369 chatty=True,
3370 connectionchatty=False)
3371 with server:
3372 s = test_wrap_socket(socket.socket(),
3373 server_side=False,
3374 certfile=CERTFILE,
3375 ca_certs=CERTFILE,
3376 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003377 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003378 s.connect((HOST, server.port))
3379 # helper methods for standardising recv* method signatures
3380 def _recv_into():
3381 b = bytearray(b"\0"*100)
3382 count = s.recv_into(b)
3383 return b[:count]
3384
3385 def _recvfrom_into():
3386 b = bytearray(b"\0"*100)
3387 count, addr = s.recvfrom_into(b)
3388 return b[:count]
3389
3390 # (name, method, expect success?, *args, return value func)
3391 send_methods = [
3392 ('send', s.send, True, [], len),
3393 ('sendto', s.sendto, False, ["some.address"], len),
3394 ('sendall', s.sendall, True, [], lambda x: None),
3395 ]
3396 # (name, method, whether to expect success, *args)
3397 recv_methods = [
3398 ('recv', s.recv, True, []),
3399 ('recvfrom', s.recvfrom, False, ["some.address"]),
3400 ('recv_into', _recv_into, True, []),
3401 ('recvfrom_into', _recvfrom_into, False, []),
3402 ]
3403 data_prefix = "PREFIX_"
3404
3405 for (meth_name, send_meth, expect_success, args,
3406 ret_val_meth) in send_methods:
3407 indata = (data_prefix + meth_name).encode('ascii')
3408 try:
3409 ret = send_meth(indata, *args)
3410 msg = "sending with {}".format(meth_name)
3411 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3412 outdata = s.read()
3413 if outdata != indata.lower():
3414 self.fail(
3415 "While sending with <<{name:s}>> bad data "
3416 "<<{outdata:r}>> ({nout:d}) received; "
3417 "expected <<{indata:r}>> ({nin:d})\n".format(
3418 name=meth_name, outdata=outdata[:20],
3419 nout=len(outdata),
3420 indata=indata[:20], nin=len(indata)
3421 )
3422 )
3423 except ValueError as e:
3424 if expect_success:
3425 self.fail(
3426 "Failed to send with method <<{name:s}>>; "
3427 "expected to succeed.\n".format(name=meth_name)
3428 )
3429 if not str(e).startswith(meth_name):
3430 self.fail(
3431 "Method <<{name:s}>> failed with unexpected "
3432 "exception message: {exp:s}\n".format(
3433 name=meth_name, exp=e
3434 )
3435 )
3436
3437 for meth_name, recv_meth, expect_success, args in recv_methods:
3438 indata = (data_prefix + meth_name).encode('ascii')
3439 try:
3440 s.send(indata)
3441 outdata = recv_meth(*args)
3442 if outdata != indata.lower():
3443 self.fail(
3444 "While receiving with <<{name:s}>> bad data "
3445 "<<{outdata:r}>> ({nout:d}) received; "
3446 "expected <<{indata:r}>> ({nin:d})\n".format(
3447 name=meth_name, outdata=outdata[:20],
3448 nout=len(outdata),
3449 indata=indata[:20], nin=len(indata)
3450 )
3451 )
3452 except ValueError as e:
3453 if expect_success:
3454 self.fail(
3455 "Failed to receive with method <<{name:s}>>; "
3456 "expected to succeed.\n".format(name=meth_name)
3457 )
3458 if not str(e).startswith(meth_name):
3459 self.fail(
3460 "Method <<{name:s}>> failed with unexpected "
3461 "exception message: {exp:s}\n".format(
3462 name=meth_name, exp=e
3463 )
3464 )
3465 # consume data
3466 s.read()
3467
3468 # read(-1, buffer) is supported, even though read(-1) is not
3469 data = b"data"
3470 s.send(data)
3471 buffer = bytearray(len(data))
3472 self.assertEqual(s.read(-1, buffer), len(data))
3473 self.assertEqual(buffer, data)
3474
Christian Heimes888bbdc2017-09-07 14:18:21 -07003475 # sendall accepts bytes-like objects
3476 if ctypes is not None:
3477 ubyte = ctypes.c_ubyte * len(data)
3478 byteslike = ubyte.from_buffer_copy(data)
3479 s.sendall(byteslike)
3480 self.assertEqual(s.read(), data)
3481
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003482 # Make sure sendmsg et al are disallowed to avoid
3483 # inadvertent disclosure of data and/or corruption
3484 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003485 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003486 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3487 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3488 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003489 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003490 s.write(b"over\n")
3491
3492 self.assertRaises(ValueError, s.recv, -1)
3493 self.assertRaises(ValueError, s.read, -1)
3494
3495 s.close()
3496
3497 def test_recv_zero(self):
3498 server = ThreadedEchoServer(CERTFILE)
3499 server.__enter__()
3500 self.addCleanup(server.__exit__, None, None)
3501 s = socket.create_connection((HOST, server.port))
3502 self.addCleanup(s.close)
3503 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3504 self.addCleanup(s.close)
3505
3506 # recv/read(0) should return no data
3507 s.send(b"data")
3508 self.assertEqual(s.recv(0), b"")
3509 self.assertEqual(s.read(0), b"")
3510 self.assertEqual(s.read(), b"data")
3511
3512 # Should not block if the other end sends no data
3513 s.setblocking(False)
3514 self.assertEqual(s.recv(0), b"")
3515 self.assertEqual(s.recv_into(bytearray()), 0)
3516
3517 def test_nonblocking_send(self):
3518 server = ThreadedEchoServer(CERTFILE,
3519 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003520 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003521 cacerts=CERTFILE,
3522 chatty=True,
3523 connectionchatty=False)
3524 with server:
3525 s = test_wrap_socket(socket.socket(),
3526 server_side=False,
3527 certfile=CERTFILE,
3528 ca_certs=CERTFILE,
3529 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003530 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003531 s.connect((HOST, server.port))
3532 s.setblocking(False)
3533
3534 # If we keep sending data, at some point the buffers
3535 # will be full and the call will block
3536 buf = bytearray(8192)
3537 def fill_buffer():
3538 while True:
3539 s.send(buf)
3540 self.assertRaises((ssl.SSLWantWriteError,
3541 ssl.SSLWantReadError), fill_buffer)
3542
3543 # Now read all the output and discard it
3544 s.setblocking(True)
3545 s.close()
3546
3547 def test_handshake_timeout(self):
3548 # Issue #5103: SSL handshake must respect the socket timeout
3549 server = socket.socket(socket.AF_INET)
3550 host = "127.0.0.1"
3551 port = support.bind_port(server)
3552 started = threading.Event()
3553 finish = False
3554
3555 def serve():
3556 server.listen()
3557 started.set()
3558 conns = []
3559 while not finish:
3560 r, w, e = select.select([server], [], [], 0.1)
3561 if server in r:
3562 # Let the socket hang around rather than having
3563 # it closed by garbage collection.
3564 conns.append(server.accept()[0])
3565 for sock in conns:
3566 sock.close()
3567
3568 t = threading.Thread(target=serve)
3569 t.start()
3570 started.wait()
3571
3572 try:
3573 try:
3574 c = socket.socket(socket.AF_INET)
3575 c.settimeout(0.2)
3576 c.connect((host, port))
3577 # Will attempt handshake and time out
3578 self.assertRaisesRegex(socket.timeout, "timed out",
3579 test_wrap_socket, c)
3580 finally:
3581 c.close()
3582 try:
3583 c = socket.socket(socket.AF_INET)
3584 c = test_wrap_socket(c)
3585 c.settimeout(0.2)
3586 # Will attempt handshake and time out
3587 self.assertRaisesRegex(socket.timeout, "timed out",
3588 c.connect, (host, port))
3589 finally:
3590 c.close()
3591 finally:
3592 finish = True
3593 t.join()
3594 server.close()
3595
3596 def test_server_accept(self):
3597 # Issue #16357: accept() on a SSLSocket created through
3598 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003599 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003600 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003601 context.load_verify_locations(SIGNING_CA)
3602 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003603 server = socket.socket(socket.AF_INET)
3604 host = "127.0.0.1"
3605 port = support.bind_port(server)
3606 server = context.wrap_socket(server, server_side=True)
3607 self.assertTrue(server.server_side)
3608
3609 evt = threading.Event()
3610 remote = None
3611 peer = None
3612 def serve():
3613 nonlocal remote, peer
3614 server.listen()
3615 # Block on the accept and wait on the connection to close.
3616 evt.set()
3617 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003618 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003619
3620 t = threading.Thread(target=serve)
3621 t.start()
3622 # Client wait until server setup and perform a connect.
3623 evt.wait()
3624 client = context.wrap_socket(socket.socket())
3625 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003626 client.send(b'data')
3627 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003628 client_addr = client.getsockname()
3629 client.close()
3630 t.join()
3631 remote.close()
3632 server.close()
3633 # Sanity checks.
3634 self.assertIsInstance(remote, ssl.SSLSocket)
3635 self.assertEqual(peer, client_addr)
3636
3637 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003638 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003639 with context.wrap_socket(socket.socket()) as sock:
3640 with self.assertRaises(OSError) as cm:
3641 sock.getpeercert()
3642 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3643
3644 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003645 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003646 with context.wrap_socket(socket.socket()) as sock:
3647 with self.assertRaises(OSError) as cm:
3648 sock.do_handshake()
3649 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3650
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003651 def test_no_shared_ciphers(self):
3652 client_context, server_context, hostname = testing_context()
3653 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3654 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003655 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003656 client_context.set_ciphers("AES128")
3657 server_context.set_ciphers("AES256")
3658 with ThreadedEchoServer(context=server_context) as server:
3659 with client_context.wrap_socket(socket.socket(),
3660 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003661 with self.assertRaises(OSError):
3662 s.connect((HOST, server.port))
3663 self.assertIn("no shared cipher", server.conn_errors[0])
3664
3665 def test_version_basic(self):
3666 """
3667 Basic tests for SSLSocket.version().
3668 More tests are done in the test_protocol_*() methods.
3669 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003670 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3671 context.check_hostname = False
3672 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003673 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003674 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003675 chatty=False) as server:
3676 with context.wrap_socket(socket.socket()) as s:
3677 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003678 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003679 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003680 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003681 self.assertEqual(s.version(), 'TLSv1.3')
3682 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003683 self.assertEqual(s.version(), 'TLSv1.2')
3684 else: # 0.9.8 to 1.0.1
3685 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003686 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003687 self.assertIs(s.version(), None)
3688
Christian Heimescb5b68a2017-09-07 18:07:00 -07003689 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3690 "test requires TLSv1.3 enabled OpenSSL")
3691 def test_tls1_3(self):
3692 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3693 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003694 context.options |= (
3695 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3696 )
3697 with ThreadedEchoServer(context=context) as server:
3698 with context.wrap_socket(socket.socket()) as s:
3699 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003700 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003701 'TLS_AES_256_GCM_SHA384',
3702 'TLS_CHACHA20_POLY1305_SHA256',
3703 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003704 })
3705 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003706
Christian Heimes698dde12018-02-27 11:54:43 +01003707 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3708 "required OpenSSL 1.1.0g")
3709 def test_min_max_version(self):
3710 client_context, server_context, hostname = testing_context()
3711 # client TLSv1.0 to 1.2
3712 client_context.minimum_version = ssl.TLSVersion.TLSv1
3713 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3714 # server only TLSv1.2
3715 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3716 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3717
3718 with ThreadedEchoServer(context=server_context) as server:
3719 with client_context.wrap_socket(socket.socket(),
3720 server_hostname=hostname) as s:
3721 s.connect((HOST, server.port))
3722 self.assertEqual(s.version(), 'TLSv1.2')
3723
3724 # client 1.0 to 1.2, server 1.0 to 1.1
3725 server_context.minimum_version = ssl.TLSVersion.TLSv1
3726 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3727
3728 with ThreadedEchoServer(context=server_context) as server:
3729 with client_context.wrap_socket(socket.socket(),
3730 server_hostname=hostname) as s:
3731 s.connect((HOST, server.port))
3732 self.assertEqual(s.version(), 'TLSv1.1')
3733
3734 # client 1.0, server 1.2 (mismatch)
Christian Heimes698dde12018-02-27 11:54:43 +01003735 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimesc9bc49c2019-09-11 19:24:47 +02003736 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
Christian Heimese35d1ba2019-06-03 20:40:15 +02003737 client_context.maximum_version = ssl.TLSVersion.TLSv1
Christian Heimesde606ea2019-09-11 19:48:58 +02003738 client_context.minimum_version = ssl.TLSVersion.TLSv1
Christian Heimes698dde12018-02-27 11:54:43 +01003739 with ThreadedEchoServer(context=server_context) as server:
3740 with client_context.wrap_socket(socket.socket(),
3741 server_hostname=hostname) as s:
3742 with self.assertRaises(ssl.SSLError) as e:
3743 s.connect((HOST, server.port))
3744 self.assertIn("alert", str(e.exception))
3745
3746
3747 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3748 "required OpenSSL 1.1.0g")
3749 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3750 def test_min_max_version_sslv3(self):
3751 client_context, server_context, hostname = testing_context()
3752 server_context.minimum_version = ssl.TLSVersion.SSLv3
3753 client_context.minimum_version = ssl.TLSVersion.SSLv3
3754 client_context.maximum_version = ssl.TLSVersion.SSLv3
3755 with ThreadedEchoServer(context=server_context) as server:
3756 with client_context.wrap_socket(socket.socket(),
3757 server_hostname=hostname) as s:
3758 s.connect((HOST, server.port))
3759 self.assertEqual(s.version(), 'SSLv3')
3760
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3762 def test_default_ecdh_curve(self):
3763 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3764 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003765 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003766 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003767 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3768 # cipher name.
3769 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003770 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3771 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3772 # our default cipher list should prefer ECDH-based ciphers
3773 # automatically.
3774 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3775 context.set_ciphers("ECCdraft:ECDH")
3776 with ThreadedEchoServer(context=context) as server:
3777 with context.wrap_socket(socket.socket()) as s:
3778 s.connect((HOST, server.port))
3779 self.assertIn("ECDH", s.cipher()[0])
3780
3781 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3782 "'tls-unique' channel binding not available")
3783 def test_tls_unique_channel_binding(self):
3784 """Test tls-unique channel binding."""
3785 if support.verbose:
3786 sys.stdout.write("\n")
3787
Christian Heimes05d9fe32018-02-27 08:55:39 +01003788 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003789
3790 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003791 chatty=True,
3792 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003793
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003794 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003795 with client_context.wrap_socket(
3796 socket.socket(),
3797 server_hostname=hostname) as s:
3798 s.connect((HOST, server.port))
3799 # get the data
3800 cb_data = s.get_channel_binding("tls-unique")
3801 if support.verbose:
3802 sys.stdout.write(
3803 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003804
Christian Heimes05d9fe32018-02-27 08:55:39 +01003805 # check if it is sane
3806 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003807 if s.version() == 'TLSv1.3':
3808 self.assertEqual(len(cb_data), 48)
3809 else:
3810 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003811
Christian Heimes05d9fe32018-02-27 08:55:39 +01003812 # and compare with the peers version
3813 s.write(b"CB tls-unique\n")
3814 peer_data_repr = s.read().strip()
3815 self.assertEqual(peer_data_repr,
3816 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003817
3818 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003819 with client_context.wrap_socket(
3820 socket.socket(),
3821 server_hostname=hostname) as s:
3822 s.connect((HOST, server.port))
3823 new_cb_data = s.get_channel_binding("tls-unique")
3824 if support.verbose:
3825 sys.stdout.write(
3826 "got another channel binding data: {0!r}\n".format(
3827 new_cb_data)
3828 )
3829 # is it really unique
3830 self.assertNotEqual(cb_data, new_cb_data)
3831 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003832 if s.version() == 'TLSv1.3':
3833 self.assertEqual(len(cb_data), 48)
3834 else:
3835 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003836 s.write(b"CB tls-unique\n")
3837 peer_data_repr = s.read().strip()
3838 self.assertEqual(peer_data_repr,
3839 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003840
3841 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003842 client_context, server_context, hostname = testing_context()
3843 stats = server_params_test(client_context, server_context,
3844 chatty=True, connectionchatty=True,
3845 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003846 if support.verbose:
3847 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3848 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3849
3850 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3851 "ssl.OP_NO_COMPRESSION needed for this test")
3852 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003853 client_context, server_context, hostname = testing_context()
3854 client_context.options |= ssl.OP_NO_COMPRESSION
3855 server_context.options |= ssl.OP_NO_COMPRESSION
3856 stats = server_params_test(client_context, server_context,
3857 chatty=True, connectionchatty=True,
3858 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003859 self.assertIs(stats['compression'], None)
3860
Paul Monsonf3550692019-06-19 13:09:54 -07003861 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003862 def test_dh_params(self):
3863 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003864 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003865 # test scenario needs TLS <= 1.2
3866 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003867 server_context.load_dh_params(DHFILE)
3868 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003869 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003870 stats = server_params_test(client_context, server_context,
3871 chatty=True, connectionchatty=True,
3872 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003873 cipher = stats["cipher"][0]
3874 parts = cipher.split("-")
3875 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3876 self.fail("Non-DH cipher: " + cipher[0])
3877
Christian Heimesb7b92252018-02-25 09:49:31 +01003878 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003879 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003880 def test_ecdh_curve(self):
3881 # server secp384r1, client auto
3882 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003883
Christian Heimesb7b92252018-02-25 09:49:31 +01003884 server_context.set_ecdh_curve("secp384r1")
3885 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3886 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3887 stats = server_params_test(client_context, server_context,
3888 chatty=True, connectionchatty=True,
3889 sni_name=hostname)
3890
3891 # server auto, client secp384r1
3892 client_context, server_context, hostname = testing_context()
3893 client_context.set_ecdh_curve("secp384r1")
3894 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3895 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3896 stats = server_params_test(client_context, server_context,
3897 chatty=True, connectionchatty=True,
3898 sni_name=hostname)
3899
3900 # server / client curve mismatch
3901 client_context, server_context, hostname = testing_context()
3902 client_context.set_ecdh_curve("prime256v1")
3903 server_context.set_ecdh_curve("secp384r1")
3904 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3905 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3906 try:
3907 stats = server_params_test(client_context, server_context,
3908 chatty=True, connectionchatty=True,
3909 sni_name=hostname)
3910 except ssl.SSLError:
3911 pass
3912 else:
3913 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003914 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003915 self.fail("mismatch curve did not fail")
3916
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003917 def test_selected_alpn_protocol(self):
3918 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003919 client_context, server_context, hostname = testing_context()
3920 stats = server_params_test(client_context, server_context,
3921 chatty=True, connectionchatty=True,
3922 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003923 self.assertIs(stats['client_alpn_protocol'], None)
3924
3925 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3926 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3927 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003928 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003929 server_context.set_alpn_protocols(['foo', 'bar'])
3930 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003931 chatty=True, connectionchatty=True,
3932 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003933 self.assertIs(stats['client_alpn_protocol'], None)
3934
3935 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3936 def test_alpn_protocols(self):
3937 server_protocols = ['foo', 'bar', 'milkshake']
3938 protocol_tests = [
3939 (['foo', 'bar'], 'foo'),
3940 (['bar', 'foo'], 'foo'),
3941 (['milkshake'], 'milkshake'),
3942 (['http/3.0', 'http/4.0'], None)
3943 ]
3944 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003945 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003946 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003947 client_context.set_alpn_protocols(client_protocols)
3948
3949 try:
3950 stats = server_params_test(client_context,
3951 server_context,
3952 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003953 connectionchatty=True,
3954 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003955 except ssl.SSLError as e:
3956 stats = e
3957
Christian Heimes05d9fe32018-02-27 08:55:39 +01003958 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003959 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3960 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3961 self.assertIsInstance(stats, ssl.SSLError)
3962 else:
3963 msg = "failed trying %s (s) and %s (c).\n" \
3964 "was expecting %s, but got %%s from the %%s" \
3965 % (str(server_protocols), str(client_protocols),
3966 str(expected))
3967 client_result = stats['client_alpn_protocol']
3968 self.assertEqual(client_result, expected,
3969 msg % (client_result, "client"))
3970 server_result = stats['server_alpn_protocols'][-1] \
3971 if len(stats['server_alpn_protocols']) else 'nothing'
3972 self.assertEqual(server_result, expected,
3973 msg % (server_result, "server"))
3974
3975 def test_selected_npn_protocol(self):
3976 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003977 client_context, server_context, hostname = testing_context()
3978 stats = server_params_test(client_context, server_context,
3979 chatty=True, connectionchatty=True,
3980 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003981 self.assertIs(stats['client_npn_protocol'], None)
3982
3983 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3984 def test_npn_protocols(self):
3985 server_protocols = ['http/1.1', 'spdy/2']
3986 protocol_tests = [
3987 (['http/1.1', 'spdy/2'], 'http/1.1'),
3988 (['spdy/2', 'http/1.1'], 'http/1.1'),
3989 (['spdy/2', 'test'], 'spdy/2'),
3990 (['abc', 'def'], 'abc')
3991 ]
3992 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003993 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003994 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003995 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003996 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003997 chatty=True, connectionchatty=True,
3998 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003999 msg = "failed trying %s (s) and %s (c).\n" \
4000 "was expecting %s, but got %%s from the %%s" \
4001 % (str(server_protocols), str(client_protocols),
4002 str(expected))
4003 client_result = stats['client_npn_protocol']
4004 self.assertEqual(client_result, expected, msg % (client_result, "client"))
4005 server_result = stats['server_npn_protocols'][-1] \
4006 if len(stats['server_npn_protocols']) else 'nothing'
4007 self.assertEqual(server_result, expected, msg % (server_result, "server"))
4008
4009 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004010 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004011 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02004012 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004013 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02004014 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004015 client_context.load_verify_locations(SIGNING_CA)
4016 return server_context, other_context, client_context
4017
4018 def check_common_name(self, stats, name):
4019 cert = stats['peercert']
4020 self.assertIn((('commonName', name),), cert['subject'])
4021
4022 @needs_sni
4023 def test_sni_callback(self):
4024 calls = []
4025 server_context, other_context, client_context = self.sni_contexts()
4026
Christian Heimesa170fa12017-09-15 20:27:30 +02004027 client_context.check_hostname = False
4028
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004029 def servername_cb(ssl_sock, server_name, initial_context):
4030 calls.append((server_name, initial_context))
4031 if server_name is not None:
4032 ssl_sock.context = other_context
4033 server_context.set_servername_callback(servername_cb)
4034
4035 stats = server_params_test(client_context, server_context,
4036 chatty=True,
4037 sni_name='supermessage')
4038 # The hostname was fetched properly, and the certificate was
4039 # changed for the connection.
4040 self.assertEqual(calls, [("supermessage", server_context)])
4041 # CERTFILE4 was selected
4042 self.check_common_name(stats, 'fakehostname')
4043
4044 calls = []
4045 # The callback is called with server_name=None
4046 stats = server_params_test(client_context, server_context,
4047 chatty=True,
4048 sni_name=None)
4049 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004050 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004051
4052 # Check disabling the callback
4053 calls = []
4054 server_context.set_servername_callback(None)
4055
4056 stats = server_params_test(client_context, server_context,
4057 chatty=True,
4058 sni_name='notfunny')
4059 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004060 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004061 self.assertEqual(calls, [])
4062
4063 @needs_sni
4064 def test_sni_callback_alert(self):
4065 # Returning a TLS alert is reflected to the connecting client
4066 server_context, other_context, client_context = self.sni_contexts()
4067
4068 def cb_returning_alert(ssl_sock, server_name, initial_context):
4069 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4070 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 with self.assertRaises(ssl.SSLError) as cm:
4072 stats = server_params_test(client_context, server_context,
4073 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004074 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004075 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004076
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004077 @needs_sni
4078 def test_sni_callback_raising(self):
4079 # Raising fails the connection with a TLS handshake failure alert.
4080 server_context, other_context, client_context = self.sni_contexts()
4081
4082 def cb_raising(ssl_sock, server_name, initial_context):
4083 1/0
4084 server_context.set_servername_callback(cb_raising)
4085
Victor Stinner00253502019-06-03 03:51:43 +02004086 with support.catch_unraisable_exception() as catch:
4087 with self.assertRaises(ssl.SSLError) as cm:
4088 stats = server_params_test(client_context, server_context,
4089 chatty=False,
4090 sni_name='supermessage')
4091
4092 self.assertEqual(cm.exception.reason,
4093 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4094 self.assertEqual(catch.unraisable.exc_type, ZeroDivisionError)
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 @needs_sni
4097 def test_sni_callback_wrong_return_type(self):
4098 # Returning the wrong return type terminates the TLS connection
4099 # with an internal error alert.
4100 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004101
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004102 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4103 return "foo"
4104 server_context.set_servername_callback(cb_wrong_return_type)
4105
Victor Stinner00253502019-06-03 03:51:43 +02004106 with support.catch_unraisable_exception() as catch:
4107 with self.assertRaises(ssl.SSLError) as cm:
4108 stats = server_params_test(client_context, server_context,
4109 chatty=False,
4110 sni_name='supermessage')
4111
4112
4113 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4114 self.assertEqual(catch.unraisable.exc_type, TypeError)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004115
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004116 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004117 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004118 client_context.set_ciphers("AES128:AES256")
4119 server_context.set_ciphers("AES256")
4120 expected_algs = [
4121 "AES256", "AES-256",
4122 # TLS 1.3 ciphers are always enabled
4123 "TLS_CHACHA20", "TLS_AES",
4124 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004125
Christian Heimesa170fa12017-09-15 20:27:30 +02004126 stats = server_params_test(client_context, server_context,
4127 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004128 ciphers = stats['server_shared_ciphers'][0]
4129 self.assertGreater(len(ciphers), 0)
4130 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004131 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004132 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004133
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004134 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004135 client_context, server_context, hostname = testing_context()
4136 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004137
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004138 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004139 s = client_context.wrap_socket(socket.socket(),
4140 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004141 s.connect((HOST, server.port))
4142 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004143
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004144 self.assertRaises(ValueError, s.read, 1024)
4145 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004146
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004147 def test_sendfile(self):
4148 TEST_DATA = b"x" * 512
4149 with open(support.TESTFN, 'wb') as f:
4150 f.write(TEST_DATA)
4151 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004152 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004153 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004154 context.load_verify_locations(SIGNING_CA)
4155 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004156 server = ThreadedEchoServer(context=context, chatty=False)
4157 with server:
4158 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004159 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004160 with open(support.TESTFN, 'rb') as file:
4161 s.sendfile(file)
4162 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004163
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004164 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004165 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004166 # TODO: sessions aren't compatible with TLSv1.3 yet
4167 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004169 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004170 stats = server_params_test(client_context, server_context,
4171 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004172 session = stats['session']
4173 self.assertTrue(session.id)
4174 self.assertGreater(session.time, 0)
4175 self.assertGreater(session.timeout, 0)
4176 self.assertTrue(session.has_ticket)
4177 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4178 self.assertGreater(session.ticket_lifetime_hint, 0)
4179 self.assertFalse(stats['session_reused'])
4180 sess_stat = server_context.session_stats()
4181 self.assertEqual(sess_stat['accept'], 1)
4182 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004183
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004184 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004185 stats = server_params_test(client_context, server_context,
4186 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004187 sess_stat = server_context.session_stats()
4188 self.assertEqual(sess_stat['accept'], 2)
4189 self.assertEqual(sess_stat['hits'], 1)
4190 self.assertTrue(stats['session_reused'])
4191 session2 = stats['session']
4192 self.assertEqual(session2.id, session.id)
4193 self.assertEqual(session2, session)
4194 self.assertIsNot(session2, session)
4195 self.assertGreaterEqual(session2.time, session.time)
4196 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004197
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004198 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004199 stats = server_params_test(client_context, server_context,
4200 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004201 self.assertFalse(stats['session_reused'])
4202 session3 = stats['session']
4203 self.assertNotEqual(session3.id, session.id)
4204 self.assertNotEqual(session3, session)
4205 sess_stat = server_context.session_stats()
4206 self.assertEqual(sess_stat['accept'], 3)
4207 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004208
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004209 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004210 stats = server_params_test(client_context, server_context,
4211 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004212 self.assertTrue(stats['session_reused'])
4213 session4 = stats['session']
4214 self.assertEqual(session4.id, session.id)
4215 self.assertEqual(session4, session)
4216 self.assertGreaterEqual(session4.time, session.time)
4217 self.assertGreaterEqual(session4.timeout, session.timeout)
4218 sess_stat = server_context.session_stats()
4219 self.assertEqual(sess_stat['accept'], 4)
4220 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004221
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004222 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004223 client_context, server_context, hostname = testing_context()
4224 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004225
Christian Heimes05d9fe32018-02-27 08:55:39 +01004226 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004227 client_context.options |= ssl.OP_NO_TLSv1_3
4228 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004229
Christian Heimesa170fa12017-09-15 20:27:30 +02004230 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004231 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004232 with client_context.wrap_socket(socket.socket(),
4233 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004234 # session is None before handshake
4235 self.assertEqual(s.session, None)
4236 self.assertEqual(s.session_reused, None)
4237 s.connect((HOST, server.port))
4238 session = s.session
4239 self.assertTrue(session)
4240 with self.assertRaises(TypeError) as e:
4241 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004242 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004243
Christian Heimesa170fa12017-09-15 20:27:30 +02004244 with client_context.wrap_socket(socket.socket(),
4245 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004246 s.connect((HOST, server.port))
4247 # cannot set session after handshake
4248 with self.assertRaises(ValueError) as e:
4249 s.session = session
4250 self.assertEqual(str(e.exception),
4251 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004252
Christian Heimesa170fa12017-09-15 20:27:30 +02004253 with client_context.wrap_socket(socket.socket(),
4254 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004255 # can set session before handshake and before the
4256 # connection was established
4257 s.session = session
4258 s.connect((HOST, server.port))
4259 self.assertEqual(s.session.id, session.id)
4260 self.assertEqual(s.session, session)
4261 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004262
Christian Heimesa170fa12017-09-15 20:27:30 +02004263 with client_context2.wrap_socket(socket.socket(),
4264 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004265 # cannot re-use session with a different SSLContext
4266 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004267 s.session = session
4268 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004269 self.assertEqual(str(e.exception),
4270 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004271
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004272
Christian Heimes9fb051f2018-09-23 08:32:31 +02004273@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4274class TestPostHandshakeAuth(unittest.TestCase):
4275 def test_pha_setter(self):
4276 protocols = [
4277 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4278 ]
4279 for protocol in protocols:
4280 ctx = ssl.SSLContext(protocol)
4281 self.assertEqual(ctx.post_handshake_auth, False)
4282
4283 ctx.post_handshake_auth = True
4284 self.assertEqual(ctx.post_handshake_auth, True)
4285
4286 ctx.verify_mode = ssl.CERT_REQUIRED
4287 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4288 self.assertEqual(ctx.post_handshake_auth, True)
4289
4290 ctx.post_handshake_auth = False
4291 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4292 self.assertEqual(ctx.post_handshake_auth, False)
4293
4294 ctx.verify_mode = ssl.CERT_OPTIONAL
4295 ctx.post_handshake_auth = True
4296 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4297 self.assertEqual(ctx.post_handshake_auth, True)
4298
4299 def test_pha_required(self):
4300 client_context, server_context, hostname = testing_context()
4301 server_context.post_handshake_auth = True
4302 server_context.verify_mode = ssl.CERT_REQUIRED
4303 client_context.post_handshake_auth = True
4304 client_context.load_cert_chain(SIGNED_CERTFILE)
4305
4306 server = ThreadedEchoServer(context=server_context, chatty=False)
4307 with server:
4308 with client_context.wrap_socket(socket.socket(),
4309 server_hostname=hostname) as s:
4310 s.connect((HOST, server.port))
4311 s.write(b'HASCERT')
4312 self.assertEqual(s.recv(1024), b'FALSE\n')
4313 s.write(b'PHA')
4314 self.assertEqual(s.recv(1024), b'OK\n')
4315 s.write(b'HASCERT')
4316 self.assertEqual(s.recv(1024), b'TRUE\n')
4317 # PHA method just returns true when cert is already available
4318 s.write(b'PHA')
4319 self.assertEqual(s.recv(1024), b'OK\n')
4320 s.write(b'GETCERT')
4321 cert_text = s.recv(4096).decode('us-ascii')
4322 self.assertIn('Python Software Foundation CA', cert_text)
4323
4324 def test_pha_required_nocert(self):
4325 client_context, server_context, hostname = testing_context()
4326 server_context.post_handshake_auth = True
4327 server_context.verify_mode = ssl.CERT_REQUIRED
4328 client_context.post_handshake_auth = True
4329
Victor Stinner73ea5462019-07-09 14:33:49 +02004330 # Ignore expected SSLError in ConnectionHandler of ThreadedEchoServer
4331 # (it is only raised sometimes on Windows)
4332 with support.catch_threading_exception() as cm:
4333 server = ThreadedEchoServer(context=server_context, chatty=False)
4334 with server:
4335 with client_context.wrap_socket(socket.socket(),
4336 server_hostname=hostname) as s:
4337 s.connect((HOST, server.port))
4338 s.write(b'PHA')
4339 # receive CertificateRequest
4340 self.assertEqual(s.recv(1024), b'OK\n')
4341 # send empty Certificate + Finish
4342 s.write(b'HASCERT')
4343 # receive alert
4344 with self.assertRaisesRegex(
4345 ssl.SSLError,
4346 'tlsv13 alert certificate required'):
4347 s.recv(1024)
Christian Heimes9fb051f2018-09-23 08:32:31 +02004348
4349 def test_pha_optional(self):
4350 if support.verbose:
4351 sys.stdout.write("\n")
4352
4353 client_context, server_context, hostname = testing_context()
4354 server_context.post_handshake_auth = True
4355 server_context.verify_mode = ssl.CERT_REQUIRED
4356 client_context.post_handshake_auth = True
4357 client_context.load_cert_chain(SIGNED_CERTFILE)
4358
4359 # check CERT_OPTIONAL
4360 server_context.verify_mode = ssl.CERT_OPTIONAL
4361 server = ThreadedEchoServer(context=server_context, chatty=False)
4362 with server:
4363 with client_context.wrap_socket(socket.socket(),
4364 server_hostname=hostname) as s:
4365 s.connect((HOST, server.port))
4366 s.write(b'HASCERT')
4367 self.assertEqual(s.recv(1024), b'FALSE\n')
4368 s.write(b'PHA')
4369 self.assertEqual(s.recv(1024), b'OK\n')
4370 s.write(b'HASCERT')
4371 self.assertEqual(s.recv(1024), b'TRUE\n')
4372
4373 def test_pha_optional_nocert(self):
4374 if support.verbose:
4375 sys.stdout.write("\n")
4376
4377 client_context, server_context, hostname = testing_context()
4378 server_context.post_handshake_auth = True
4379 server_context.verify_mode = ssl.CERT_OPTIONAL
4380 client_context.post_handshake_auth = True
4381
4382 server = ThreadedEchoServer(context=server_context, chatty=False)
4383 with server:
4384 with client_context.wrap_socket(socket.socket(),
4385 server_hostname=hostname) as s:
4386 s.connect((HOST, server.port))
4387 s.write(b'HASCERT')
4388 self.assertEqual(s.recv(1024), b'FALSE\n')
4389 s.write(b'PHA')
4390 self.assertEqual(s.recv(1024), b'OK\n')
penguindustin96466302019-05-06 14:57:17 -04004391 # optional doesn't fail when client does not have a cert
Christian Heimes9fb051f2018-09-23 08:32:31 +02004392 s.write(b'HASCERT')
4393 self.assertEqual(s.recv(1024), b'FALSE\n')
4394
4395 def test_pha_no_pha_client(self):
4396 client_context, server_context, hostname = testing_context()
4397 server_context.post_handshake_auth = True
4398 server_context.verify_mode = ssl.CERT_REQUIRED
4399 client_context.load_cert_chain(SIGNED_CERTFILE)
4400
4401 server = ThreadedEchoServer(context=server_context, chatty=False)
4402 with server:
4403 with client_context.wrap_socket(socket.socket(),
4404 server_hostname=hostname) as s:
4405 s.connect((HOST, server.port))
4406 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4407 s.verify_client_post_handshake()
4408 s.write(b'PHA')
4409 self.assertIn(b'extension not received', s.recv(1024))
4410
4411 def test_pha_no_pha_server(self):
4412 # server doesn't have PHA enabled, cert is requested in handshake
4413 client_context, server_context, hostname = testing_context()
4414 server_context.verify_mode = ssl.CERT_REQUIRED
4415 client_context.post_handshake_auth = True
4416 client_context.load_cert_chain(SIGNED_CERTFILE)
4417
4418 server = ThreadedEchoServer(context=server_context, chatty=False)
4419 with server:
4420 with client_context.wrap_socket(socket.socket(),
4421 server_hostname=hostname) as s:
4422 s.connect((HOST, server.port))
4423 s.write(b'HASCERT')
4424 self.assertEqual(s.recv(1024), b'TRUE\n')
4425 # PHA doesn't fail if there is already a cert
4426 s.write(b'PHA')
4427 self.assertEqual(s.recv(1024), b'OK\n')
4428 s.write(b'HASCERT')
4429 self.assertEqual(s.recv(1024), b'TRUE\n')
4430
4431 def test_pha_not_tls13(self):
4432 # TLS 1.2
4433 client_context, server_context, hostname = testing_context()
4434 server_context.verify_mode = ssl.CERT_REQUIRED
4435 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4436 client_context.post_handshake_auth = True
4437 client_context.load_cert_chain(SIGNED_CERTFILE)
4438
4439 server = ThreadedEchoServer(context=server_context, chatty=False)
4440 with server:
4441 with client_context.wrap_socket(socket.socket(),
4442 server_hostname=hostname) as s:
4443 s.connect((HOST, server.port))
4444 # PHA fails for TLS != 1.3
4445 s.write(b'PHA')
4446 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4447
Christian Heimesf0f59302019-07-01 08:29:17 +02004448 def test_bpo37428_pha_cert_none(self):
4449 # verify that post_handshake_auth does not implicitly enable cert
4450 # validation.
4451 hostname = SIGNED_CERTFILE_HOSTNAME
4452 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4453 client_context.post_handshake_auth = True
4454 client_context.load_cert_chain(SIGNED_CERTFILE)
4455 # no cert validation and CA on client side
4456 client_context.check_hostname = False
4457 client_context.verify_mode = ssl.CERT_NONE
4458
4459 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
4460 server_context.load_cert_chain(SIGNED_CERTFILE)
4461 server_context.load_verify_locations(SIGNING_CA)
4462 server_context.post_handshake_auth = True
4463 server_context.verify_mode = ssl.CERT_REQUIRED
4464
4465 server = ThreadedEchoServer(context=server_context, chatty=False)
4466 with server:
4467 with client_context.wrap_socket(socket.socket(),
4468 server_hostname=hostname) as s:
4469 s.connect((HOST, server.port))
4470 s.write(b'HASCERT')
4471 self.assertEqual(s.recv(1024), b'FALSE\n')
4472 s.write(b'PHA')
4473 self.assertEqual(s.recv(1024), b'OK\n')
4474 s.write(b'HASCERT')
4475 self.assertEqual(s.recv(1024), b'TRUE\n')
4476 # server cert has not been validated
4477 self.assertEqual(s.getpeercert(), {})
4478
Christian Heimes9fb051f2018-09-23 08:32:31 +02004479
Christian Heimesc7f70692019-05-31 11:44:05 +02004480HAS_KEYLOG = hasattr(ssl.SSLContext, 'keylog_filename')
4481requires_keylog = unittest.skipUnless(
4482 HAS_KEYLOG, 'test requires OpenSSL 1.1.1 with keylog callback')
4483
4484class TestSSLDebug(unittest.TestCase):
4485
4486 def keylog_lines(self, fname=support.TESTFN):
4487 with open(fname) as f:
4488 return len(list(f))
4489
4490 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004491 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004492 def test_keylog_defaults(self):
4493 self.addCleanup(support.unlink, support.TESTFN)
4494 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4495 self.assertEqual(ctx.keylog_filename, None)
4496
4497 self.assertFalse(os.path.isfile(support.TESTFN))
4498 ctx.keylog_filename = support.TESTFN
4499 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4500 self.assertTrue(os.path.isfile(support.TESTFN))
4501 self.assertEqual(self.keylog_lines(), 1)
4502
4503 ctx.keylog_filename = None
4504 self.assertEqual(ctx.keylog_filename, None)
4505
4506 with self.assertRaises((IsADirectoryError, PermissionError)):
4507 # Windows raises PermissionError
4508 ctx.keylog_filename = os.path.dirname(
4509 os.path.abspath(support.TESTFN))
4510
4511 with self.assertRaises(TypeError):
4512 ctx.keylog_filename = 1
4513
4514 @requires_keylog
Paul Monsonf3550692019-06-19 13:09:54 -07004515 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004516 def test_keylog_filename(self):
4517 self.addCleanup(support.unlink, support.TESTFN)
4518 client_context, server_context, hostname = testing_context()
4519
4520 client_context.keylog_filename = support.TESTFN
4521 server = ThreadedEchoServer(context=server_context, chatty=False)
4522 with server:
4523 with client_context.wrap_socket(socket.socket(),
4524 server_hostname=hostname) as s:
4525 s.connect((HOST, server.port))
4526 # header, 5 lines for TLS 1.3
4527 self.assertEqual(self.keylog_lines(), 6)
4528
4529 client_context.keylog_filename = None
4530 server_context.keylog_filename = support.TESTFN
4531 server = ThreadedEchoServer(context=server_context, chatty=False)
4532 with server:
4533 with client_context.wrap_socket(socket.socket(),
4534 server_hostname=hostname) as s:
4535 s.connect((HOST, server.port))
4536 self.assertGreaterEqual(self.keylog_lines(), 11)
4537
4538 client_context.keylog_filename = support.TESTFN
4539 server_context.keylog_filename = support.TESTFN
4540 server = ThreadedEchoServer(context=server_context, chatty=False)
4541 with server:
4542 with client_context.wrap_socket(socket.socket(),
4543 server_hostname=hostname) as s:
4544 s.connect((HOST, server.port))
4545 self.assertGreaterEqual(self.keylog_lines(), 21)
4546
4547 client_context.keylog_filename = None
4548 server_context.keylog_filename = None
4549
4550 @requires_keylog
4551 @unittest.skipIf(sys.flags.ignore_environment,
4552 "test is not compatible with ignore_environment")
Paul Monsonf3550692019-06-19 13:09:54 -07004553 @unittest.skipIf(Py_DEBUG_WIN32, "Avoid mixing debug/release CRT on Windows")
Christian Heimesc7f70692019-05-31 11:44:05 +02004554 def test_keylog_env(self):
4555 self.addCleanup(support.unlink, support.TESTFN)
4556 with unittest.mock.patch.dict(os.environ):
4557 os.environ['SSLKEYLOGFILE'] = support.TESTFN
4558 self.assertEqual(os.environ['SSLKEYLOGFILE'], support.TESTFN)
4559
4560 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
4561 self.assertEqual(ctx.keylog_filename, None)
4562
4563 ctx = ssl.create_default_context()
4564 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4565
4566 ctx = ssl._create_stdlib_context()
4567 self.assertEqual(ctx.keylog_filename, support.TESTFN)
4568
4569 def test_msg_callback(self):
4570 client_context, server_context, hostname = testing_context()
4571
4572 def msg_cb(conn, direction, version, content_type, msg_type, data):
4573 pass
4574
4575 self.assertIs(client_context._msg_callback, None)
4576 client_context._msg_callback = msg_cb
4577 self.assertIs(client_context._msg_callback, msg_cb)
4578 with self.assertRaises(TypeError):
4579 client_context._msg_callback = object()
4580
4581 def test_msg_callback_tls12(self):
4582 client_context, server_context, hostname = testing_context()
4583 client_context.options |= ssl.OP_NO_TLSv1_3
4584
4585 msg = []
4586
4587 def msg_cb(conn, direction, version, content_type, msg_type, data):
4588 self.assertIsInstance(conn, ssl.SSLSocket)
4589 self.assertIsInstance(data, bytes)
4590 self.assertIn(direction, {'read', 'write'})
4591 msg.append((direction, version, content_type, msg_type))
4592
4593 client_context._msg_callback = msg_cb
4594
4595 server = ThreadedEchoServer(context=server_context, chatty=False)
4596 with server:
4597 with client_context.wrap_socket(socket.socket(),
4598 server_hostname=hostname) as s:
4599 s.connect((HOST, server.port))
4600
Christian Heimese35d1ba2019-06-03 20:40:15 +02004601 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004602 ("read", TLSVersion.TLSv1_2, _TLSContentType.HANDSHAKE,
4603 _TLSMessageType.SERVER_KEY_EXCHANGE),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004604 msg
4605 )
4606 self.assertIn(
Christian Heimesc7f70692019-05-31 11:44:05 +02004607 ("write", TLSVersion.TLSv1_2, _TLSContentType.CHANGE_CIPHER_SPEC,
4608 _TLSMessageType.CHANGE_CIPHER_SPEC),
Christian Heimese35d1ba2019-06-03 20:40:15 +02004609 msg
4610 )
Christian Heimesc7f70692019-05-31 11:44:05 +02004611
4612
Thomas Woutersed03b412007-08-28 21:37:11 +00004613def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004614 if support.verbose:
4615 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004616 'Mac': platform.mac_ver,
4617 'Windows': platform.win32_ver,
4618 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004619 for name, func in plats.items():
4620 plat = func()
4621 if plat and plat[0]:
4622 plat = '%s %r' % (name, plat)
4623 break
4624 else:
4625 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004626 print("test_ssl: testing with %r %r" %
4627 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4628 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004629 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004630 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4631 try:
4632 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4633 except AttributeError:
4634 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004635
Antoine Pitrou152efa22010-05-16 18:19:27 +00004636 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004637 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004638 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004639 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004640 BADCERT, BADKEY, EMPTYCERT]:
4641 if not os.path.exists(filename):
4642 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004643
Martin Panter3840b2a2016-03-27 01:53:46 +00004644 tests = [
4645 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004646 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimesc7f70692019-05-31 11:44:05 +02004647 TestPostHandshakeAuth, TestSSLDebug
Martin Panter3840b2a2016-03-27 01:53:46 +00004648 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004649
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004650 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004651 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004652
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004653 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004654 try:
4655 support.run_unittest(*tests)
4656 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004657 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004658
4659if __name__ == "__main__":
4660 test_main()