blob: 4444e945952fc3ad1e290295a5d35b707aa3b6de [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010020import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070021try:
22 import ctypes
23except ImportError:
24 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000025
Antoine Pitrou05d936d2010-10-13 11:38:36 +000026ssl = support.import_module("ssl")
27
Martin Panter3840b2a2016-03-27 01:53:46 +000028
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010029PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000030HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020031IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010032IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
33IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Victor Stinner3ef63442019-02-19 18:06:03 +010036PROTOCOL_TO_TLS_VERSION = {}
37for proto, ver in (
38 ("PROTOCOL_SSLv23", "SSLv3"),
39 ("PROTOCOL_TLSv1", "TLSv1"),
40 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
41):
42 try:
43 proto = getattr(ssl, proto)
44 ver = getattr(ssl.TLSVersion, ver)
45 except AttributeError:
46 continue
47 PROTOCOL_TO_TLS_VERSION[proto] = ver
48
Christian Heimesefff7062013-11-21 03:35:02 +010049def data_file(*name):
50 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000051
Antoine Pitrou81564092010-10-08 23:06:24 +000052# The custom key and certificate files used in test_ssl are generated
53# using Lib/test/make_ssl_certs.py.
54# Other certificates are simply fetched from the Internet servers they
55# are meant to authenticate.
56
Antoine Pitrou152efa22010-05-16 18:19:27 +000057CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000058BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000059ONLYCERT = data_file("ssl_cert.pem")
60ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000061BYTES_ONLYCERT = os.fsencode(ONLYCERT)
62BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020063CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
64ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
65KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000066CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000067BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010068CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
69CAFILE_CACERT = data_file("capath", "5ed36f99.0")
70
Christian Heimesbd5c7d22018-01-20 15:16:30 +010071CERTFILE_INFO = {
72 'issuer': ((('countryName', 'XY'),),
73 (('localityName', 'Castle Anthrax'),),
74 (('organizationName', 'Python Software Foundation'),),
75 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020076 'notAfter': 'Aug 26 14:23:15 2028 GMT',
77 'notBefore': 'Aug 29 14:23:15 2018 GMT',
78 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010079 'subject': ((('countryName', 'XY'),),
80 (('localityName', 'Castle Anthrax'),),
81 (('organizationName', 'Python Software Foundation'),),
82 (('commonName', 'localhost'),)),
83 'subjectAltName': (('DNS', 'localhost'),),
84 'version': 3
85}
Antoine Pitrou152efa22010-05-16 18:19:27 +000086
Christian Heimes22587792013-11-21 23:56:13 +010087# empty CRL
88CRLFILE = data_file("revocation.crl")
89
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010090# Two keys and certs signed by the same CA (for SNI tests)
91SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020092SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010093
94SIGNED_CERTFILE_INFO = {
95 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
96 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
97 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
98 'issuer': ((('countryName', 'XY'),),
99 (('organizationName', 'Python Software Foundation CA'),),
100 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200101 'notAfter': 'Jul 7 14:23:16 2028 GMT',
102 'notBefore': 'Aug 29 14:23:16 2018 GMT',
103 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100104 'subject': ((('countryName', 'XY'),),
105 (('localityName', 'Castle Anthrax'),),
106 (('organizationName', 'Python Software Foundation'),),
107 (('commonName', 'localhost'),)),
108 'subjectAltName': (('DNS', 'localhost'),),
109 'version': 3
110}
111
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100112SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200113SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100114SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
115SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
116
Martin Panter3840b2a2016-03-27 01:53:46 +0000117# Same certificate as pycacert.pem, but without extra text in file
118SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200119# cert with all kinds of subject alt names
120ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100121IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100122
Martin Panter3d81d932016-01-14 09:36:00 +0000123REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000124
125EMPTYCERT = data_file("nullcert.pem")
126BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000127NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000128BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200129NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200130NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100131TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000132
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200133DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100134BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Christian Heimes358cfd42016-09-10 22:43:48 +0200136# Not defined in all versions of OpenSSL
137OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
138OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
139OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
140OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100141OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200142
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100143
Thomas Woutersed03b412007-08-28 21:37:11 +0000144def handle_error(prefix):
145 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000146 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000147 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000148
Antoine Pitroub5218772010-05-21 09:56:06 +0000149def can_clear_options():
150 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200151 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000152
153def no_sslv2_implies_sslv3_hello():
154 # 0.9.7h or higher
155 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
156
Christian Heimes2427b502013-11-23 11:24:32 +0100157def have_verify_flags():
158 # 0.9.8 or higher
159 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
160
Christian Heimesb7b92252018-02-25 09:49:31 +0100161def _have_secp_curves():
162 if not ssl.HAS_ECDH:
163 return False
164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
165 try:
166 ctx.set_ecdh_curve("secp384r1")
167 except ValueError:
168 return False
169 else:
170 return True
171
172
173HAVE_SECP_CURVES = _have_secp_curves()
174
175
Antoine Pitrouc695c952014-04-28 20:57:36 +0200176def utc_offset(): #NOTE: ignore issues like #1647654
177 # local time = utc time + utc offset
178 if time.daylight and time.localtime().tm_isdst > 0:
179 return -time.altzone # seconds
180 return -time.timezone
181
Christian Heimes9424bb42013-06-17 15:32:57 +0200182def asn1time(cert_time):
183 # Some versions of OpenSSL ignore seconds, see #18207
184 # 0.9.8.i
185 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
186 fmt = "%b %d %H:%M:%S %Y GMT"
187 dt = datetime.datetime.strptime(cert_time, fmt)
188 dt = dt.replace(second=0)
189 cert_time = dt.strftime(fmt)
190 # %d adds leading zero but ASN1_TIME_print() uses leading space
191 if cert_time[4] == "0":
192 cert_time = cert_time[:4] + " " + cert_time[5:]
193
194 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000195
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100196needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
197
Antoine Pitrou23df4832010-08-04 17:14:06 +0000198
Christian Heimesd0486372016-09-10 23:23:33 +0200199def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
200 cert_reqs=ssl.CERT_NONE, ca_certs=None,
201 ciphers=None, certfile=None, keyfile=None,
202 **kwargs):
203 context = ssl.SSLContext(ssl_version)
204 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200205 if cert_reqs == ssl.CERT_NONE:
206 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200207 context.verify_mode = cert_reqs
208 if ca_certs is not None:
209 context.load_verify_locations(ca_certs)
210 if certfile is not None or keyfile is not None:
211 context.load_cert_chain(certfile, keyfile)
212 if ciphers is not None:
213 context.set_ciphers(ciphers)
214 return context.wrap_socket(sock, **kwargs)
215
Christian Heimesa170fa12017-09-15 20:27:30 +0200216
217def testing_context(server_cert=SIGNED_CERTFILE):
218 """Create context
219
220 client_context, server_context, hostname = testing_context()
221 """
222 if server_cert == SIGNED_CERTFILE:
223 hostname = SIGNED_CERTFILE_HOSTNAME
224 elif server_cert == SIGNED_CERTFILE2:
225 hostname = SIGNED_CERTFILE2_HOSTNAME
226 else:
227 raise ValueError(server_cert)
228
229 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
230 client_context.load_verify_locations(SIGNING_CA)
231
232 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
233 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200234 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200235
236 return client_context, server_context, hostname
237
238
Antoine Pitrou152efa22010-05-16 18:19:27 +0000239class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000240
Antoine Pitrou480a1242010-04-28 21:37:09 +0000241 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000242 ssl.CERT_NONE
243 ssl.CERT_OPTIONAL
244 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100245 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100246 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100247 if ssl.HAS_ECDH:
248 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100249 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
250 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000251 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100252 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700253 ssl.OP_NO_SSLv2
254 ssl.OP_NO_SSLv3
255 ssl.OP_NO_TLSv1
256 ssl.OP_NO_TLSv1_3
257 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
258 ssl.OP_NO_TLSv1_1
259 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200260 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000261
Christian Heimes9d50ab52018-02-27 10:17:30 +0100262 def test_private_init(self):
263 with self.assertRaisesRegex(TypeError, "public constructor"):
264 with socket.socket() as s:
265 ssl.SSLSocket(s)
266
Antoine Pitrou172f0252014-04-18 20:33:08 +0200267 def test_str_for_enums(self):
268 # Make sure that the PROTOCOL_* constants have enum-like string
269 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200270 proto = ssl.PROTOCOL_TLS
271 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200272 ctx = ssl.SSLContext(proto)
273 self.assertIs(ctx.protocol, proto)
274
Antoine Pitrou480a1242010-04-28 21:37:09 +0000275 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000276 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000277 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 sys.stdout.write("\n RAND_status is %d (%s)\n"
279 % (v, (v and "sufficient randomness") or
280 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200281
282 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
283 self.assertEqual(len(data), 16)
284 self.assertEqual(is_cryptographic, v == 1)
285 if v:
286 data = ssl.RAND_bytes(16)
287 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200288 else:
289 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200290
Victor Stinner1e81a392013-12-19 16:47:04 +0100291 # negative num is invalid
292 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
293 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
294
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100295 if hasattr(ssl, 'RAND_egd'):
296 self.assertRaises(TypeError, ssl.RAND_egd, 1)
297 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000298 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200299 ssl.RAND_add(b"this is a random bytes object", 75.0)
300 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000301
Christian Heimesf77b4b22013-08-21 13:26:05 +0200302 @unittest.skipUnless(os.name == 'posix', 'requires posix')
303 def test_random_fork(self):
304 status = ssl.RAND_status()
305 if not status:
306 self.fail("OpenSSL's PRNG has insufficient randomness")
307
308 rfd, wfd = os.pipe()
309 pid = os.fork()
310 if pid == 0:
311 try:
312 os.close(rfd)
313 child_random = ssl.RAND_pseudo_bytes(16)[0]
314 self.assertEqual(len(child_random), 16)
315 os.write(wfd, child_random)
316 os.close(wfd)
317 except BaseException:
318 os._exit(1)
319 else:
320 os._exit(0)
321 else:
322 os.close(wfd)
323 self.addCleanup(os.close, rfd)
324 _, status = os.waitpid(pid, 0)
325 self.assertEqual(status, 0)
326
327 child_random = os.read(rfd, 16)
328 self.assertEqual(len(child_random), 16)
329 parent_random = ssl.RAND_pseudo_bytes(16)[0]
330 self.assertEqual(len(parent_random), 16)
331
332 self.assertNotEqual(child_random, parent_random)
333
Christian Heimese6dac002018-08-30 07:25:49 +0200334 maxDiff = None
335
Antoine Pitrou480a1242010-04-28 21:37:09 +0000336 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000337 # note that this uses an 'unofficial' function in _ssl.c,
338 # provided solely for this test, to exercise the certificate
339 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100340 self.assertEqual(
341 ssl._ssl._test_decode_cert(CERTFILE),
342 CERTFILE_INFO
343 )
344 self.assertEqual(
345 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
346 SIGNED_CERTFILE_INFO
347 )
348
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200349 # Issue #13034: the subjectAltName in some certificates
350 # (notably projects.developer.nokia.com:443) wasn't parsed
351 p = ssl._ssl._test_decode_cert(NOKIACERT)
352 if support.verbose:
353 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
354 self.assertEqual(p['subjectAltName'],
355 (('DNS', 'projects.developer.nokia.com'),
356 ('DNS', 'projects.forum.nokia.com'))
357 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100358 # extra OCSP and AIA fields
359 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
360 self.assertEqual(p['caIssuers'],
361 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
362 self.assertEqual(p['crlDistributionPoints'],
363 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000364
Christian Heimesa37f5242019-01-15 23:47:42 +0100365 def test_parse_cert_CVE_2019_5010(self):
366 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
367 if support.verbose:
368 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
369 self.assertEqual(
370 p,
371 {
372 'issuer': (
373 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
374 'notAfter': 'Jun 14 18:00:58 2028 GMT',
375 'notBefore': 'Jun 18 18:00:58 2018 GMT',
376 'serialNumber': '02',
377 'subject': ((('countryName', 'UK'),),
378 (('commonName',
379 'codenomicon-vm-2.test.lal.cisco.com'),)),
380 'subjectAltName': (
381 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
382 'version': 3
383 }
384 )
385
Christian Heimes824f7f32013-08-17 00:54:47 +0200386 def test_parse_cert_CVE_2013_4238(self):
387 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
388 if support.verbose:
389 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
390 subject = ((('countryName', 'US'),),
391 (('stateOrProvinceName', 'Oregon'),),
392 (('localityName', 'Beaverton'),),
393 (('organizationName', 'Python Software Foundation'),),
394 (('organizationalUnitName', 'Python Core Development'),),
395 (('commonName', 'null.python.org\x00example.org'),),
396 (('emailAddress', 'python-dev@python.org'),))
397 self.assertEqual(p['subject'], subject)
398 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200399 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
400 san = (('DNS', 'altnull.python.org\x00example.com'),
401 ('email', 'null@python.org\x00user@example.org'),
402 ('URI', 'http://null.python.org\x00http://example.org'),
403 ('IP Address', '192.0.2.1'),
404 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
405 else:
406 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
407 san = (('DNS', 'altnull.python.org\x00example.com'),
408 ('email', 'null@python.org\x00user@example.org'),
409 ('URI', 'http://null.python.org\x00http://example.org'),
410 ('IP Address', '192.0.2.1'),
411 ('IP Address', '<invalid>'))
412
413 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200414
Christian Heimes1c03abd2016-09-06 23:25:35 +0200415 def test_parse_all_sans(self):
416 p = ssl._ssl._test_decode_cert(ALLSANFILE)
417 self.assertEqual(p['subjectAltName'],
418 (
419 ('DNS', 'allsans'),
420 ('othername', '<unsupported>'),
421 ('othername', '<unsupported>'),
422 ('email', 'user@example.org'),
423 ('DNS', 'www.example.org'),
424 ('DirName',
425 ((('countryName', 'XY'),),
426 (('localityName', 'Castle Anthrax'),),
427 (('organizationName', 'Python Software Foundation'),),
428 (('commonName', 'dirname example'),))),
429 ('URI', 'https://www.python.org/'),
430 ('IP Address', '127.0.0.1'),
431 ('IP Address', '0:0:0:0:0:0:0:1\n'),
432 ('Registered ID', '1.2.3.4.5')
433 )
434 )
435
Antoine Pitrou480a1242010-04-28 21:37:09 +0000436 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000437 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000438 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000439 d1 = ssl.PEM_cert_to_DER_cert(pem)
440 p2 = ssl.DER_cert_to_PEM_cert(d1)
441 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000442 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000443 if not p2.startswith(ssl.PEM_HEADER + '\n'):
444 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
445 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
446 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000447
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000448 def test_openssl_version(self):
449 n = ssl.OPENSSL_VERSION_NUMBER
450 t = ssl.OPENSSL_VERSION_INFO
451 s = ssl.OPENSSL_VERSION
452 self.assertIsInstance(n, int)
453 self.assertIsInstance(t, tuple)
454 self.assertIsInstance(s, str)
455 # Some sanity checks follow
456 # >= 0.9
457 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400458 # < 3.0
459 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000460 major, minor, fix, patch, status = t
461 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400462 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000463 self.assertGreaterEqual(minor, 0)
464 self.assertLess(minor, 256)
465 self.assertGreaterEqual(fix, 0)
466 self.assertLess(fix, 256)
467 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100468 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000469 self.assertGreaterEqual(status, 0)
470 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400471 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200472 if IS_LIBRESSL:
473 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100474 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400475 else:
476 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100477 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000478
Antoine Pitrou9d543662010-04-23 23:10:32 +0000479 @support.cpython_only
480 def test_refcycle(self):
481 # Issue #7943: an SSL object doesn't create reference cycles with
482 # itself.
483 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200484 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000485 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100486 with support.check_warnings(("", ResourceWarning)):
487 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100488 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000489
Antoine Pitroua468adc2010-09-14 14:43:44 +0000490 def test_wrapped_unconnected(self):
491 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200492 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000493 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200494 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100495 self.assertRaises(OSError, ss.recv, 1)
496 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
497 self.assertRaises(OSError, ss.recvfrom, 1)
498 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
499 self.assertRaises(OSError, ss.send, b'x')
500 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200501 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100502 self.assertRaises(NotImplementedError, ss.sendmsg,
503 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200504 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
505 self.assertRaises(NotImplementedError, ss.recvmsg_into,
506 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000507
Antoine Pitrou40f08742010-04-24 22:04:40 +0000508 def test_timeout(self):
509 # Issue #8524: when creating an SSL socket, the timeout of the
510 # original socket should be retained.
511 for timeout in (None, 0.0, 5.0):
512 s = socket.socket(socket.AF_INET)
513 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200514 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100515 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000516
Christian Heimesd0486372016-09-10 23:23:33 +0200517 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000518 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000519 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000520 "certfile must be specified",
521 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000522 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000523 "certfile must be specified for server-side operations",
524 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000525 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000526 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200527 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100528 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
529 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200530 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200531 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000532 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000533 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000534 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200535 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000536 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000537 ssl.wrap_socket(sock,
538 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000539 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200540 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000541 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000542 ssl.wrap_socket(sock,
543 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000544 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000545
Martin Panter3464ea22016-02-01 21:58:11 +0000546 def bad_cert_test(self, certfile):
547 """Check that trying to use the given client certificate fails"""
548 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
549 certfile)
550 sock = socket.socket()
551 self.addCleanup(sock.close)
552 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200553 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200554 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000555
556 def test_empty_cert(self):
557 """Wrapping with an empty cert file"""
558 self.bad_cert_test("nullcert.pem")
559
560 def test_malformed_cert(self):
561 """Wrapping with a badly formatted certificate (syntax error)"""
562 self.bad_cert_test("badcert.pem")
563
564 def test_malformed_key(self):
565 """Wrapping with a badly formatted key (syntax error)"""
566 self.bad_cert_test("badkey.pem")
567
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000568 def test_match_hostname(self):
569 def ok(cert, hostname):
570 ssl.match_hostname(cert, hostname)
571 def fail(cert, hostname):
572 self.assertRaises(ssl.CertificateError,
573 ssl.match_hostname, cert, hostname)
574
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100575 # -- Hostname matching --
576
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000577 cert = {'subject': ((('commonName', 'example.com'),),)}
578 ok(cert, 'example.com')
579 ok(cert, 'ExAmple.cOm')
580 fail(cert, 'www.example.com')
581 fail(cert, '.example.com')
582 fail(cert, 'example.org')
583 fail(cert, 'exampleXcom')
584
585 cert = {'subject': ((('commonName', '*.a.com'),),)}
586 ok(cert, 'foo.a.com')
587 fail(cert, 'bar.foo.a.com')
588 fail(cert, 'a.com')
589 fail(cert, 'Xa.com')
590 fail(cert, '.a.com')
591
Mandeep Singhede2ac92017-11-27 04:01:27 +0530592 # only match wildcards when they are the only thing
593 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000594 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530595 fail(cert, 'foo.com')
596 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000597 fail(cert, 'bar.com')
598 fail(cert, 'foo.a.com')
599 fail(cert, 'bar.foo.com')
600
Christian Heimes824f7f32013-08-17 00:54:47 +0200601 # NULL bytes are bad, CVE-2013-4073
602 cert = {'subject': ((('commonName',
603 'null.python.org\x00example.org'),),)}
604 ok(cert, 'null.python.org\x00example.org') # or raise an error?
605 fail(cert, 'example.org')
606 fail(cert, 'null.python.org')
607
Georg Brandl72c98d32013-10-27 07:16:53 +0100608 # error cases with wildcards
609 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
610 fail(cert, 'bar.foo.a.com')
611 fail(cert, 'a.com')
612 fail(cert, 'Xa.com')
613 fail(cert, '.a.com')
614
615 cert = {'subject': ((('commonName', 'a.*.com'),),)}
616 fail(cert, 'a.foo.com')
617 fail(cert, 'a..com')
618 fail(cert, 'a.com')
619
620 # wildcard doesn't match IDNA prefix 'xn--'
621 idna = 'püthon.python.org'.encode("idna").decode("ascii")
622 cert = {'subject': ((('commonName', idna),),)}
623 ok(cert, idna)
624 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
625 fail(cert, idna)
626 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
627 fail(cert, idna)
628
629 # wildcard in first fragment and IDNA A-labels in sequent fragments
630 # are supported.
631 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
632 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530633 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
634 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100635 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
636 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
637
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000638 # Slightly fake real-world example
639 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
640 'subject': ((('commonName', 'linuxfrz.org'),),),
641 'subjectAltName': (('DNS', 'linuxfr.org'),
642 ('DNS', 'linuxfr.com'),
643 ('othername', '<unsupported>'))}
644 ok(cert, 'linuxfr.org')
645 ok(cert, 'linuxfr.com')
646 # Not a "DNS" entry
647 fail(cert, '<unsupported>')
648 # When there is a subjectAltName, commonName isn't used
649 fail(cert, 'linuxfrz.org')
650
651 # A pristine real-world example
652 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
653 'subject': ((('countryName', 'US'),),
654 (('stateOrProvinceName', 'California'),),
655 (('localityName', 'Mountain View'),),
656 (('organizationName', 'Google Inc'),),
657 (('commonName', 'mail.google.com'),))}
658 ok(cert, 'mail.google.com')
659 fail(cert, 'gmail.com')
660 # Only commonName is considered
661 fail(cert, 'California')
662
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100663 # -- IPv4 matching --
664 cert = {'subject': ((('commonName', 'example.com'),),),
665 'subjectAltName': (('DNS', 'example.com'),
666 ('IP Address', '10.11.12.13'),
667 ('IP Address', '14.15.16.17'))}
668 ok(cert, '10.11.12.13')
669 ok(cert, '14.15.16.17')
670 fail(cert, '14.15.16.18')
671 fail(cert, 'example.net')
672
673 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100674 if hasattr(socket, 'AF_INET6'):
675 cert = {'subject': ((('commonName', 'example.com'),),),
676 'subjectAltName': (
677 ('DNS', 'example.com'),
678 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
679 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
680 ok(cert, '2001::cafe')
681 ok(cert, '2003::baba')
682 fail(cert, '2003::bebe')
683 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100684
685 # -- Miscellaneous --
686
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000687 # Neither commonName nor subjectAltName
688 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
689 'subject': ((('countryName', 'US'),),
690 (('stateOrProvinceName', 'California'),),
691 (('localityName', 'Mountain View'),),
692 (('organizationName', 'Google Inc'),))}
693 fail(cert, 'mail.google.com')
694
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200695 # No DNS entry in subjectAltName but a commonName
696 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
697 'subject': ((('countryName', 'US'),),
698 (('stateOrProvinceName', 'California'),),
699 (('localityName', 'Mountain View'),),
700 (('commonName', 'mail.google.com'),)),
701 'subjectAltName': (('othername', 'blabla'), )}
702 ok(cert, 'mail.google.com')
703
704 # No DNS entry subjectAltName and no commonName
705 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
706 'subject': ((('countryName', 'US'),),
707 (('stateOrProvinceName', 'California'),),
708 (('localityName', 'Mountain View'),),
709 (('organizationName', 'Google Inc'),)),
710 'subjectAltName': (('othername', 'blabla'),)}
711 fail(cert, 'google.com')
712
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000713 # Empty cert / no cert
714 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
715 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
716
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200717 # Issue #17980: avoid denials of service by refusing more than one
718 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100719 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
720 with self.assertRaisesRegex(
721 ssl.CertificateError,
722 "partial wildcards in leftmost label are not supported"):
723 ssl.match_hostname(cert, 'axxb.example.com')
724
725 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
726 with self.assertRaisesRegex(
727 ssl.CertificateError,
728 "wildcard can only be present in the leftmost label"):
729 ssl.match_hostname(cert, 'www.sub.example.com')
730
731 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
732 with self.assertRaisesRegex(
733 ssl.CertificateError,
734 "too many wildcards"):
735 ssl.match_hostname(cert, 'axxbxxc.example.com')
736
737 cert = {'subject': ((('commonName', '*'),),)}
738 with self.assertRaisesRegex(
739 ssl.CertificateError,
740 "sole wildcard without additional labels are not support"):
741 ssl.match_hostname(cert, 'host')
742
743 cert = {'subject': ((('commonName', '*.com'),),)}
744 with self.assertRaisesRegex(
745 ssl.CertificateError,
746 r"hostname 'com' doesn't match '\*.com'"):
747 ssl.match_hostname(cert, 'com')
748
749 # extra checks for _inet_paton()
750 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
751 with self.assertRaises(ValueError):
752 ssl._inet_paton(invalid)
753 for ipaddr in ['127.0.0.1', '192.168.0.1']:
754 self.assertTrue(ssl._inet_paton(ipaddr))
755 if hasattr(socket, 'AF_INET6'):
756 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
757 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200758
Antoine Pitroud5323212010-10-22 18:19:07 +0000759 def test_server_side(self):
760 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200761 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000762 with socket.socket() as sock:
763 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
764 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000765
Antoine Pitroud6494802011-07-21 01:11:30 +0200766 def test_unknown_channel_binding(self):
767 # should raise ValueError for unknown type
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +0200768 s = socket.create_server(('127.0.0.1', 0))
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200769 c = socket.socket(socket.AF_INET)
770 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200771 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100772 with self.assertRaises(ValueError):
773 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200774 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200775
776 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
777 "'tls-unique' channel binding not available")
778 def test_tls_unique_channel_binding(self):
779 # unconnected should return None for known type
780 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200781 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100782 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200783 # the same for server-side
784 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200785 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100786 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200787
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600788 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200789 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600790 r = repr(ss)
791 with self.assertWarns(ResourceWarning) as cm:
792 ss = None
793 support.gc_collect()
794 self.assertIn(r, str(cm.warning.args[0]))
795
Christian Heimes6d7ad132013-06-09 18:02:55 +0200796 def test_get_default_verify_paths(self):
797 paths = ssl.get_default_verify_paths()
798 self.assertEqual(len(paths), 6)
799 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
800
801 with support.EnvironmentVarGuard() as env:
802 env["SSL_CERT_DIR"] = CAPATH
803 env["SSL_CERT_FILE"] = CERTFILE
804 paths = ssl.get_default_verify_paths()
805 self.assertEqual(paths.cafile, CERTFILE)
806 self.assertEqual(paths.capath, CAPATH)
807
Christian Heimes44109d72013-11-22 01:51:30 +0100808 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
809 def test_enum_certificates(self):
810 self.assertTrue(ssl.enum_certificates("CA"))
811 self.assertTrue(ssl.enum_certificates("ROOT"))
812
813 self.assertRaises(TypeError, ssl.enum_certificates)
814 self.assertRaises(WindowsError, ssl.enum_certificates, "")
815
Christian Heimesc2d65e12013-11-22 16:13:55 +0100816 trust_oids = set()
817 for storename in ("CA", "ROOT"):
818 store = ssl.enum_certificates(storename)
819 self.assertIsInstance(store, list)
820 for element in store:
821 self.assertIsInstance(element, tuple)
822 self.assertEqual(len(element), 3)
823 cert, enc, trust = element
824 self.assertIsInstance(cert, bytes)
825 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
826 self.assertIsInstance(trust, (set, bool))
827 if isinstance(trust, set):
828 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100829
830 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100831 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200832
Christian Heimes46bebee2013-06-09 19:03:31 +0200833 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100834 def test_enum_crls(self):
835 self.assertTrue(ssl.enum_crls("CA"))
836 self.assertRaises(TypeError, ssl.enum_crls)
837 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200838
Christian Heimes44109d72013-11-22 01:51:30 +0100839 crls = ssl.enum_crls("CA")
840 self.assertIsInstance(crls, list)
841 for element in crls:
842 self.assertIsInstance(element, tuple)
843 self.assertEqual(len(element), 2)
844 self.assertIsInstance(element[0], bytes)
845 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200846
Christian Heimes46bebee2013-06-09 19:03:31 +0200847
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100848 def test_asn1object(self):
849 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
850 '1.3.6.1.5.5.7.3.1')
851
852 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
853 self.assertEqual(val, expected)
854 self.assertEqual(val.nid, 129)
855 self.assertEqual(val.shortname, 'serverAuth')
856 self.assertEqual(val.longname, 'TLS Web Server Authentication')
857 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
858 self.assertIsInstance(val, ssl._ASN1Object)
859 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
860
861 val = ssl._ASN1Object.fromnid(129)
862 self.assertEqual(val, expected)
863 self.assertIsInstance(val, ssl._ASN1Object)
864 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100865 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
866 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100867 for i in range(1000):
868 try:
869 obj = ssl._ASN1Object.fromnid(i)
870 except ValueError:
871 pass
872 else:
873 self.assertIsInstance(obj.nid, int)
874 self.assertIsInstance(obj.shortname, str)
875 self.assertIsInstance(obj.longname, str)
876 self.assertIsInstance(obj.oid, (str, type(None)))
877
878 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
879 self.assertEqual(val, expected)
880 self.assertIsInstance(val, ssl._ASN1Object)
881 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
882 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
883 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100884 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
885 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100886
Christian Heimes72d28502013-11-23 13:56:58 +0100887 def test_purpose_enum(self):
888 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
889 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
890 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
891 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
892 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
893 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
894 '1.3.6.1.5.5.7.3.1')
895
896 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
897 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
898 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
899 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
900 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
901 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
902 '1.3.6.1.5.5.7.3.2')
903
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100904 def test_unsupported_dtls(self):
905 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
906 self.addCleanup(s.close)
907 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200908 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100909 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200910 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100911 with self.assertRaises(NotImplementedError) as cx:
912 ctx.wrap_socket(s)
913 self.assertEqual(str(cx.exception), "only stream sockets are supported")
914
Antoine Pitrouc695c952014-04-28 20:57:36 +0200915 def cert_time_ok(self, timestring, timestamp):
916 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
917
918 def cert_time_fail(self, timestring):
919 with self.assertRaises(ValueError):
920 ssl.cert_time_to_seconds(timestring)
921
922 @unittest.skipUnless(utc_offset(),
923 'local time needs to be different from UTC')
924 def test_cert_time_to_seconds_timezone(self):
925 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
926 # results if local timezone is not UTC
927 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
928 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
929
930 def test_cert_time_to_seconds(self):
931 timestring = "Jan 5 09:34:43 2018 GMT"
932 ts = 1515144883.0
933 self.cert_time_ok(timestring, ts)
934 # accept keyword parameter, assert its name
935 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
936 # accept both %e and %d (space or zero generated by strftime)
937 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
938 # case-insensitive
939 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
940 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
941 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
942 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
943 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
944 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
945 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
946 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
947
948 newyear_ts = 1230768000.0
949 # leap seconds
950 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
951 # same timestamp
952 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
953
954 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
955 # allow 60th second (even if it is not a leap second)
956 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
957 # allow 2nd leap second for compatibility with time.strptime()
958 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
959 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
960
Mike53f7a7c2017-12-14 14:04:53 +0300961 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200962 # 99991231235959Z (rfc 5280)
963 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
964
965 @support.run_with_locale('LC_ALL', '')
966 def test_cert_time_to_seconds_locale(self):
967 # `cert_time_to_seconds()` should be locale independent
968
969 def local_february_name():
970 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
971
972 if local_february_name().lower() == 'feb':
973 self.skipTest("locale-specific month name needs to be "
974 "different from C locale")
975
976 # locale-independent
977 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
978 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
979
Martin Panter3840b2a2016-03-27 01:53:46 +0000980 def test_connect_ex_error(self):
981 server = socket.socket(socket.AF_INET)
982 self.addCleanup(server.close)
983 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200984 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000985 cert_reqs=ssl.CERT_REQUIRED)
986 self.addCleanup(s.close)
987 rc = s.connect_ex((HOST, port))
988 # Issue #19919: Windows machines or VMs hosted on Windows
989 # machines sometimes return EWOULDBLOCK.
990 errors = (
991 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
992 errno.EWOULDBLOCK,
993 )
994 self.assertIn(rc, errors)
995
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100996
Antoine Pitrou152efa22010-05-16 18:19:27 +0000997class ContextTests(unittest.TestCase):
998
999 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001000 for protocol in PROTOCOLS:
1001 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001002 ctx = ssl.SSLContext()
1003 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001004 self.assertRaises(ValueError, ssl.SSLContext, -1)
1005 self.assertRaises(ValueError, ssl.SSLContext, 42)
1006
1007 def test_protocol(self):
1008 for proto in PROTOCOLS:
1009 ctx = ssl.SSLContext(proto)
1010 self.assertEqual(ctx.protocol, proto)
1011
1012 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001013 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001014 ctx.set_ciphers("ALL")
1015 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001016 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001017 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001018
Christian Heimes892d66e2018-01-29 14:10:18 +01001019 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1020 "Test applies only to Python default ciphers")
1021 def test_python_ciphers(self):
1022 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1023 ciphers = ctx.get_ciphers()
1024 for suite in ciphers:
1025 name = suite['name']
1026 self.assertNotIn("PSK", name)
1027 self.assertNotIn("SRP", name)
1028 self.assertNotIn("MD5", name)
1029 self.assertNotIn("RC4", name)
1030 self.assertNotIn("3DES", name)
1031
Christian Heimes25bfcd52016-09-06 00:04:45 +02001032 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1033 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001035 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001036 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001037 self.assertIn('AES256-GCM-SHA384', names)
1038 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001039
Antoine Pitroub5218772010-05-21 09:56:06 +00001040 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001041 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001042 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001043 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001044 # SSLContext also enables these by default
1045 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001046 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1047 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001048 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001049 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001050 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001051 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001052 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1053 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001054 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001055 # Ubuntu has OP_NO_SSLv3 forced on by default
1056 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001057 else:
1058 with self.assertRaises(ValueError):
1059 ctx.options = 0
1060
Christian Heimesa170fa12017-09-15 20:27:30 +02001061 def test_verify_mode_protocol(self):
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001063 # Default value
1064 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1065 ctx.verify_mode = ssl.CERT_OPTIONAL
1066 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1067 ctx.verify_mode = ssl.CERT_REQUIRED
1068 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1069 ctx.verify_mode = ssl.CERT_NONE
1070 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1071 with self.assertRaises(TypeError):
1072 ctx.verify_mode = None
1073 with self.assertRaises(ValueError):
1074 ctx.verify_mode = 42
1075
Christian Heimesa170fa12017-09-15 20:27:30 +02001076 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1077 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1078 self.assertFalse(ctx.check_hostname)
1079
1080 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1081 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1082 self.assertTrue(ctx.check_hostname)
1083
Christian Heimes61d478c2018-01-27 15:51:38 +01001084 def test_hostname_checks_common_name(self):
1085 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1086 self.assertTrue(ctx.hostname_checks_common_name)
1087 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1088 ctx.hostname_checks_common_name = True
1089 self.assertTrue(ctx.hostname_checks_common_name)
1090 ctx.hostname_checks_common_name = False
1091 self.assertFalse(ctx.hostname_checks_common_name)
1092 ctx.hostname_checks_common_name = True
1093 self.assertTrue(ctx.hostname_checks_common_name)
1094 else:
1095 with self.assertRaises(AttributeError):
1096 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001097
Christian Heimes698dde12018-02-27 11:54:43 +01001098 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1099 "required OpenSSL 1.1.0g")
1100 def test_min_max_version(self):
1101 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001102 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1103 # Fedora override the setting to TLS 1.0.
1104 self.assertIn(
1105 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001106 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1107 # Fedora 29 uses TLS 1.0 by default
1108 ssl.TLSVersion.TLSv1,
1109 # RHEL 8 uses TLS 1.2 by default
1110 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001111 )
1112 self.assertEqual(
1113 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1114 )
1115
1116 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1117 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1118 self.assertEqual(
1119 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1120 )
1121 self.assertEqual(
1122 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1123 )
1124
1125 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1126 ctx.maximum_version = ssl.TLSVersion.TLSv1
1127 self.assertEqual(
1128 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1129 )
1130 self.assertEqual(
1131 ctx.maximum_version, ssl.TLSVersion.TLSv1
1132 )
1133
1134 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1135 self.assertEqual(
1136 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 )
1138
1139 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1140 self.assertIn(
1141 ctx.maximum_version,
1142 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1143 )
1144
1145 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1146 self.assertIn(
1147 ctx.minimum_version,
1148 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1149 )
1150
1151 with self.assertRaises(ValueError):
1152 ctx.minimum_version = 42
1153
1154 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1155
1156 self.assertEqual(
1157 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1158 )
1159 self.assertEqual(
1160 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1161 )
1162 with self.assertRaises(ValueError):
1163 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1164 with self.assertRaises(ValueError):
1165 ctx.maximum_version = ssl.TLSVersion.TLSv1
1166
1167
Christian Heimes2427b502013-11-23 11:24:32 +01001168 @unittest.skipUnless(have_verify_flags(),
1169 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001170 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001171 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001172 # default value
1173 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1174 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001175 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1176 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1177 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1178 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1179 ctx.verify_flags = ssl.VERIFY_DEFAULT
1180 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1181 # supports any value
1182 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1183 self.assertEqual(ctx.verify_flags,
1184 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1185 with self.assertRaises(TypeError):
1186 ctx.verify_flags = None
1187
Antoine Pitrou152efa22010-05-16 18:19:27 +00001188 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001189 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001190 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001191 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001192 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1193 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001194 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001195 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001196 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001197 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001198 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001199 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001200 ctx.load_cert_chain(EMPTYCERT)
1201 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001202 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001203 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1204 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1205 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001206 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001207 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001208 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001209 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001210 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001211 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1212 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001213 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001214 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001215 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001216 # Password protected key and cert
1217 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1218 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1219 ctx.load_cert_chain(CERTFILE_PROTECTED,
1220 password=bytearray(KEY_PASSWORD.encode()))
1221 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1222 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1223 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1224 bytearray(KEY_PASSWORD.encode()))
1225 with self.assertRaisesRegex(TypeError, "should be a string"):
1226 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1227 with self.assertRaises(ssl.SSLError):
1228 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1229 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1230 # openssl has a fixed limit on the password buffer.
1231 # PEM_BUFSIZE is generally set to 1kb.
1232 # Return a string larger than this.
1233 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1234 # Password callback
1235 def getpass_unicode():
1236 return KEY_PASSWORD
1237 def getpass_bytes():
1238 return KEY_PASSWORD.encode()
1239 def getpass_bytearray():
1240 return bytearray(KEY_PASSWORD.encode())
1241 def getpass_badpass():
1242 return "badpass"
1243 def getpass_huge():
1244 return b'a' * (1024 * 1024)
1245 def getpass_bad_type():
1246 return 9
1247 def getpass_exception():
1248 raise Exception('getpass error')
1249 class GetPassCallable:
1250 def __call__(self):
1251 return KEY_PASSWORD
1252 def getpass(self):
1253 return KEY_PASSWORD
1254 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1255 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1256 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1257 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1258 ctx.load_cert_chain(CERTFILE_PROTECTED,
1259 password=GetPassCallable().getpass)
1260 with self.assertRaises(ssl.SSLError):
1261 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1262 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1263 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1264 with self.assertRaisesRegex(TypeError, "must return a string"):
1265 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1266 with self.assertRaisesRegex(Exception, "getpass error"):
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1268 # Make sure the password function isn't called if it isn't needed
1269 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001270
1271 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001272 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001273 ctx.load_verify_locations(CERTFILE)
1274 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1275 ctx.load_verify_locations(BYTES_CERTFILE)
1276 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1277 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001278 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001279 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001280 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001281 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001282 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001283 ctx.load_verify_locations(BADCERT)
1284 ctx.load_verify_locations(CERTFILE, CAPATH)
1285 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1286
Victor Stinner80f75e62011-01-29 11:31:20 +00001287 # Issue #10989: crash if the second argument type is invalid
1288 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1289
Christian Heimesefff7062013-11-21 03:35:02 +01001290 def test_load_verify_cadata(self):
1291 # test cadata
1292 with open(CAFILE_CACERT) as f:
1293 cacert_pem = f.read()
1294 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1295 with open(CAFILE_NEURONIO) as f:
1296 neuronio_pem = f.read()
1297 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1298
1299 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001300 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001301 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1302 ctx.load_verify_locations(cadata=cacert_pem)
1303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1304 ctx.load_verify_locations(cadata=neuronio_pem)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1306 # cert already in hash table
1307 ctx.load_verify_locations(cadata=neuronio_pem)
1308 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1309
1310 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001311 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001312 combined = "\n".join((cacert_pem, neuronio_pem))
1313 ctx.load_verify_locations(cadata=combined)
1314 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1315
1316 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001317 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001318 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1319 neuronio_pem, "tail"]
1320 ctx.load_verify_locations(cadata="\n".join(combined))
1321 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1322
1323 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001324 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001325 ctx.load_verify_locations(cadata=cacert_der)
1326 ctx.load_verify_locations(cadata=neuronio_der)
1327 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1328 # cert already in hash table
1329 ctx.load_verify_locations(cadata=cacert_der)
1330 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1331
1332 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001333 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001334 combined = b"".join((cacert_der, neuronio_der))
1335 ctx.load_verify_locations(cadata=combined)
1336 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1337
1338 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001339 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001340 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1341
1342 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1343 ctx.load_verify_locations(cadata="broken")
1344 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1345 ctx.load_verify_locations(cadata=b"broken")
1346
1347
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001348 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001349 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001350 ctx.load_dh_params(DHFILE)
1351 if os.name != 'nt':
1352 ctx.load_dh_params(BYTES_DHFILE)
1353 self.assertRaises(TypeError, ctx.load_dh_params)
1354 self.assertRaises(TypeError, ctx.load_dh_params, None)
1355 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001356 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001357 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001358 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001359 ctx.load_dh_params(CERTFILE)
1360
Antoine Pitroub0182c82010-10-12 20:09:02 +00001361 def test_session_stats(self):
1362 for proto in PROTOCOLS:
1363 ctx = ssl.SSLContext(proto)
1364 self.assertEqual(ctx.session_stats(), {
1365 'number': 0,
1366 'connect': 0,
1367 'connect_good': 0,
1368 'connect_renegotiate': 0,
1369 'accept': 0,
1370 'accept_good': 0,
1371 'accept_renegotiate': 0,
1372 'hits': 0,
1373 'misses': 0,
1374 'timeouts': 0,
1375 'cache_full': 0,
1376 })
1377
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001378 def test_set_default_verify_paths(self):
1379 # There's not much we can do to test that it acts as expected,
1380 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001381 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001382 ctx.set_default_verify_paths()
1383
Antoine Pitrou501da612011-12-21 09:27:41 +01001384 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001385 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001386 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001387 ctx.set_ecdh_curve("prime256v1")
1388 ctx.set_ecdh_curve(b"prime256v1")
1389 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1390 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1391 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1392 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1393
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001394 @needs_sni
1395 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001396 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001397
1398 # set_servername_callback expects a callable, or None
1399 self.assertRaises(TypeError, ctx.set_servername_callback)
1400 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1401 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1402 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1403
1404 def dummycallback(sock, servername, ctx):
1405 pass
1406 ctx.set_servername_callback(None)
1407 ctx.set_servername_callback(dummycallback)
1408
1409 @needs_sni
1410 def test_sni_callback_refcycle(self):
1411 # Reference cycles through the servername callback are detected
1412 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001413 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001414 def dummycallback(sock, servername, ctx, cycle=ctx):
1415 pass
1416 ctx.set_servername_callback(dummycallback)
1417 wr = weakref.ref(ctx)
1418 del ctx, dummycallback
1419 gc.collect()
1420 self.assertIs(wr(), None)
1421
Christian Heimes9a5395a2013-06-17 15:44:12 +02001422 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001423 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001424 self.assertEqual(ctx.cert_store_stats(),
1425 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1426 ctx.load_cert_chain(CERTFILE)
1427 self.assertEqual(ctx.cert_store_stats(),
1428 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1429 ctx.load_verify_locations(CERTFILE)
1430 self.assertEqual(ctx.cert_store_stats(),
1431 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001432 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001433 self.assertEqual(ctx.cert_store_stats(),
1434 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1435
1436 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001437 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001438 self.assertEqual(ctx.get_ca_certs(), [])
1439 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1440 ctx.load_verify_locations(CERTFILE)
1441 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001442 # but CAFILE_CACERT is a CA cert
1443 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001444 self.assertEqual(ctx.get_ca_certs(),
1445 [{'issuer': ((('organizationName', 'Root CA'),),
1446 (('organizationalUnitName', 'http://www.cacert.org'),),
1447 (('commonName', 'CA Cert Signing Authority'),),
1448 (('emailAddress', 'support@cacert.org'),)),
1449 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1450 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1451 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001452 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001453 'subject': ((('organizationName', 'Root CA'),),
1454 (('organizationalUnitName', 'http://www.cacert.org'),),
1455 (('commonName', 'CA Cert Signing Authority'),),
1456 (('emailAddress', 'support@cacert.org'),)),
1457 'version': 3}])
1458
Martin Panterb55f8b72016-01-14 12:53:56 +00001459 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001460 pem = f.read()
1461 der = ssl.PEM_cert_to_DER_cert(pem)
1462 self.assertEqual(ctx.get_ca_certs(True), [der])
1463
Christian Heimes72d28502013-11-23 13:56:58 +01001464 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001465 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001466 ctx.load_default_certs()
1467
Christian Heimesa170fa12017-09-15 20:27:30 +02001468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001469 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1470 ctx.load_default_certs()
1471
Christian Heimesa170fa12017-09-15 20:27:30 +02001472 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001473 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1474
Christian Heimesa170fa12017-09-15 20:27:30 +02001475 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001476 self.assertRaises(TypeError, ctx.load_default_certs, None)
1477 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1478
Benjamin Peterson91244e02014-10-03 18:17:15 -04001479 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001480 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001481 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001482 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001483 with support.EnvironmentVarGuard() as env:
1484 env["SSL_CERT_DIR"] = CAPATH
1485 env["SSL_CERT_FILE"] = CERTFILE
1486 ctx.load_default_certs()
1487 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1488
Benjamin Peterson91244e02014-10-03 18:17:15 -04001489 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001490 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001491 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001492 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001493 ctx.load_default_certs()
1494 stats = ctx.cert_store_stats()
1495
Christian Heimesa170fa12017-09-15 20:27:30 +02001496 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001497 with support.EnvironmentVarGuard() as env:
1498 env["SSL_CERT_DIR"] = CAPATH
1499 env["SSL_CERT_FILE"] = CERTFILE
1500 ctx.load_default_certs()
1501 stats["x509"] += 1
1502 self.assertEqual(ctx.cert_store_stats(), stats)
1503
Christian Heimes358cfd42016-09-10 22:43:48 +02001504 def _assert_context_options(self, ctx):
1505 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1506 if OP_NO_COMPRESSION != 0:
1507 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1508 OP_NO_COMPRESSION)
1509 if OP_SINGLE_DH_USE != 0:
1510 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1511 OP_SINGLE_DH_USE)
1512 if OP_SINGLE_ECDH_USE != 0:
1513 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1514 OP_SINGLE_ECDH_USE)
1515 if OP_CIPHER_SERVER_PREFERENCE != 0:
1516 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1517 OP_CIPHER_SERVER_PREFERENCE)
1518
Christian Heimes4c05b472013-11-23 15:58:30 +01001519 def test_create_default_context(self):
1520 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001521
Christian Heimesa170fa12017-09-15 20:27:30 +02001522 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001523 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001524 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001525 self._assert_context_options(ctx)
1526
Christian Heimes4c05b472013-11-23 15:58:30 +01001527 with open(SIGNING_CA) as f:
1528 cadata = f.read()
1529 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1530 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001531 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001532 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001533 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001534
1535 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001536 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001537 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001538 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001539
Christian Heimes67986f92013-11-23 22:43:47 +01001540 def test__create_stdlib_context(self):
1541 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001542 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001543 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001544 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001545 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001546
1547 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1548 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1549 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001550 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001551
1552 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001553 cert_reqs=ssl.CERT_REQUIRED,
1554 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001555 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1556 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001557 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001558 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001559
1560 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001561 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001562 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001563 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001564
Christian Heimes1aa9a752013-12-02 02:41:19 +01001565 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001566 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001567 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001568 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001569
Christian Heimese82c0342017-09-15 20:29:57 +02001570 # Auto set CERT_REQUIRED
1571 ctx.check_hostname = True
1572 self.assertTrue(ctx.check_hostname)
1573 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1574 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001575 ctx.verify_mode = ssl.CERT_REQUIRED
1576 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001577 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001578
Christian Heimese82c0342017-09-15 20:29:57 +02001579 # Changing verify_mode does not affect check_hostname
1580 ctx.check_hostname = False
1581 ctx.verify_mode = ssl.CERT_NONE
1582 ctx.check_hostname = False
1583 self.assertFalse(ctx.check_hostname)
1584 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1585 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001586 ctx.check_hostname = True
1587 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001588 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1589
1590 ctx.check_hostname = False
1591 ctx.verify_mode = ssl.CERT_OPTIONAL
1592 ctx.check_hostname = False
1593 self.assertFalse(ctx.check_hostname)
1594 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1595 # keep CERT_OPTIONAL
1596 ctx.check_hostname = True
1597 self.assertTrue(ctx.check_hostname)
1598 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001599
1600 # Cannot set CERT_NONE with check_hostname enabled
1601 with self.assertRaises(ValueError):
1602 ctx.verify_mode = ssl.CERT_NONE
1603 ctx.check_hostname = False
1604 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001605 ctx.verify_mode = ssl.CERT_NONE
1606 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001607
Christian Heimes5fe668c2016-09-12 00:01:11 +02001608 def test_context_client_server(self):
1609 # PROTOCOL_TLS_CLIENT has sane defaults
1610 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1611 self.assertTrue(ctx.check_hostname)
1612 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1613
1614 # PROTOCOL_TLS_SERVER has different but also sane defaults
1615 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1616 self.assertFalse(ctx.check_hostname)
1617 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1618
Christian Heimes4df60f12017-09-15 20:26:05 +02001619 def test_context_custom_class(self):
1620 class MySSLSocket(ssl.SSLSocket):
1621 pass
1622
1623 class MySSLObject(ssl.SSLObject):
1624 pass
1625
1626 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1627 ctx.sslsocket_class = MySSLSocket
1628 ctx.sslobject_class = MySSLObject
1629
1630 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1631 self.assertIsInstance(sock, MySSLSocket)
1632 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1633 self.assertIsInstance(obj, MySSLObject)
1634
Antoine Pitrou152efa22010-05-16 18:19:27 +00001635
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001636class SSLErrorTests(unittest.TestCase):
1637
1638 def test_str(self):
1639 # The str() of a SSLError doesn't include the errno
1640 e = ssl.SSLError(1, "foo")
1641 self.assertEqual(str(e), "foo")
1642 self.assertEqual(e.errno, 1)
1643 # Same for a subclass
1644 e = ssl.SSLZeroReturnError(1, "foo")
1645 self.assertEqual(str(e), "foo")
1646 self.assertEqual(e.errno, 1)
1647
1648 def test_lib_reason(self):
1649 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001650 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001651 with self.assertRaises(ssl.SSLError) as cm:
1652 ctx.load_dh_params(CERTFILE)
1653 self.assertEqual(cm.exception.library, 'PEM')
1654 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1655 s = str(cm.exception)
1656 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1657
1658 def test_subclass(self):
1659 # Check that the appropriate SSLError subclass is raised
1660 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001661 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1662 ctx.check_hostname = False
1663 ctx.verify_mode = ssl.CERT_NONE
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02001664 with socket.create_server(("127.0.0.1", 0)) as s:
1665 c = socket.create_connection(s.getsockname())
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001666 c.setblocking(False)
1667 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001668 with self.assertRaises(ssl.SSLWantReadError) as cm:
1669 c.do_handshake()
1670 s = str(cm.exception)
1671 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1672 # For compatibility
1673 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1674
1675
Christian Heimes61d478c2018-01-27 15:51:38 +01001676 def test_bad_server_hostname(self):
1677 ctx = ssl.create_default_context()
1678 with self.assertRaises(ValueError):
1679 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1680 server_hostname="")
1681 with self.assertRaises(ValueError):
1682 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1683 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001684 with self.assertRaises(TypeError):
1685 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1686 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001687
1688
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001689class MemoryBIOTests(unittest.TestCase):
1690
1691 def test_read_write(self):
1692 bio = ssl.MemoryBIO()
1693 bio.write(b'foo')
1694 self.assertEqual(bio.read(), b'foo')
1695 self.assertEqual(bio.read(), b'')
1696 bio.write(b'foo')
1697 bio.write(b'bar')
1698 self.assertEqual(bio.read(), b'foobar')
1699 self.assertEqual(bio.read(), b'')
1700 bio.write(b'baz')
1701 self.assertEqual(bio.read(2), b'ba')
1702 self.assertEqual(bio.read(1), b'z')
1703 self.assertEqual(bio.read(1), b'')
1704
1705 def test_eof(self):
1706 bio = ssl.MemoryBIO()
1707 self.assertFalse(bio.eof)
1708 self.assertEqual(bio.read(), b'')
1709 self.assertFalse(bio.eof)
1710 bio.write(b'foo')
1711 self.assertFalse(bio.eof)
1712 bio.write_eof()
1713 self.assertFalse(bio.eof)
1714 self.assertEqual(bio.read(2), b'fo')
1715 self.assertFalse(bio.eof)
1716 self.assertEqual(bio.read(1), b'o')
1717 self.assertTrue(bio.eof)
1718 self.assertEqual(bio.read(), b'')
1719 self.assertTrue(bio.eof)
1720
1721 def test_pending(self):
1722 bio = ssl.MemoryBIO()
1723 self.assertEqual(bio.pending, 0)
1724 bio.write(b'foo')
1725 self.assertEqual(bio.pending, 3)
1726 for i in range(3):
1727 bio.read(1)
1728 self.assertEqual(bio.pending, 3-i-1)
1729 for i in range(3):
1730 bio.write(b'x')
1731 self.assertEqual(bio.pending, i+1)
1732 bio.read()
1733 self.assertEqual(bio.pending, 0)
1734
1735 def test_buffer_types(self):
1736 bio = ssl.MemoryBIO()
1737 bio.write(b'foo')
1738 self.assertEqual(bio.read(), b'foo')
1739 bio.write(bytearray(b'bar'))
1740 self.assertEqual(bio.read(), b'bar')
1741 bio.write(memoryview(b'baz'))
1742 self.assertEqual(bio.read(), b'baz')
1743
1744 def test_error_types(self):
1745 bio = ssl.MemoryBIO()
1746 self.assertRaises(TypeError, bio.write, 'foo')
1747 self.assertRaises(TypeError, bio.write, None)
1748 self.assertRaises(TypeError, bio.write, True)
1749 self.assertRaises(TypeError, bio.write, 1)
1750
1751
Christian Heimes9d50ab52018-02-27 10:17:30 +01001752class SSLObjectTests(unittest.TestCase):
1753 def test_private_init(self):
1754 bio = ssl.MemoryBIO()
1755 with self.assertRaisesRegex(TypeError, "public constructor"):
1756 ssl.SSLObject(bio, bio)
1757
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001758 def test_unwrap(self):
1759 client_ctx, server_ctx, hostname = testing_context()
1760 c_in = ssl.MemoryBIO()
1761 c_out = ssl.MemoryBIO()
1762 s_in = ssl.MemoryBIO()
1763 s_out = ssl.MemoryBIO()
1764 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1765 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1766
1767 # Loop on the handshake for a bit to get it settled
1768 for _ in range(5):
1769 try:
1770 client.do_handshake()
1771 except ssl.SSLWantReadError:
1772 pass
1773 if c_out.pending:
1774 s_in.write(c_out.read())
1775 try:
1776 server.do_handshake()
1777 except ssl.SSLWantReadError:
1778 pass
1779 if s_out.pending:
1780 c_in.write(s_out.read())
1781 # Now the handshakes should be complete (don't raise WantReadError)
1782 client.do_handshake()
1783 server.do_handshake()
1784
1785 # Now if we unwrap one side unilaterally, it should send close-notify
1786 # and raise WantReadError:
1787 with self.assertRaises(ssl.SSLWantReadError):
1788 client.unwrap()
1789
1790 # But server.unwrap() does not raise, because it reads the client's
1791 # close-notify:
1792 s_in.write(c_out.read())
1793 server.unwrap()
1794
1795 # And now that the client gets the server's close-notify, it doesn't
1796 # raise either.
1797 c_in.write(s_out.read())
1798 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001799
Martin Panter3840b2a2016-03-27 01:53:46 +00001800class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001801 """Tests that connect to a simple server running in the background"""
1802
1803 def setUp(self):
1804 server = ThreadedEchoServer(SIGNED_CERTFILE)
1805 self.server_addr = (HOST, server.port)
1806 server.__enter__()
1807 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001808
Antoine Pitrou480a1242010-04-28 21:37:09 +00001809 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001810 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001811 cert_reqs=ssl.CERT_NONE) as s:
1812 s.connect(self.server_addr)
1813 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001814 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001815
Martin Panter3840b2a2016-03-27 01:53:46 +00001816 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001817 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001818 cert_reqs=ssl.CERT_REQUIRED,
1819 ca_certs=SIGNING_CA) as s:
1820 s.connect(self.server_addr)
1821 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001822 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001823
Martin Panter3840b2a2016-03-27 01:53:46 +00001824 def test_connect_fail(self):
1825 # This should fail because we have no verification certs. Connection
1826 # failure crashes ThreadedEchoServer, so run this in an independent
1827 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001828 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 cert_reqs=ssl.CERT_REQUIRED)
1830 self.addCleanup(s.close)
1831 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1832 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001833
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001834 def test_connect_ex(self):
1835 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001836 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001837 cert_reqs=ssl.CERT_REQUIRED,
1838 ca_certs=SIGNING_CA)
1839 self.addCleanup(s.close)
1840 self.assertEqual(0, s.connect_ex(self.server_addr))
1841 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001842
1843 def test_non_blocking_connect_ex(self):
1844 # Issue #11326: non-blocking connect_ex() should allow handshake
1845 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001846 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001847 cert_reqs=ssl.CERT_REQUIRED,
1848 ca_certs=SIGNING_CA,
1849 do_handshake_on_connect=False)
1850 self.addCleanup(s.close)
1851 s.setblocking(False)
1852 rc = s.connect_ex(self.server_addr)
1853 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1854 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1855 # Wait for connect to finish
1856 select.select([], [s], [], 5.0)
1857 # Non-blocking handshake
1858 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001859 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001860 s.do_handshake()
1861 break
1862 except ssl.SSLWantReadError:
1863 select.select([s], [], [], 5.0)
1864 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001865 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001866 # SSL established
1867 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001868
Antoine Pitrou152efa22010-05-16 18:19:27 +00001869 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001870 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001871 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1873 s.connect(self.server_addr)
1874 self.assertEqual({}, s.getpeercert())
1875 # Same with a server hostname
1876 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1877 server_hostname="dummy") as s:
1878 s.connect(self.server_addr)
1879 ctx.verify_mode = ssl.CERT_REQUIRED
1880 # This should succeed because we specify the root cert
1881 ctx.load_verify_locations(SIGNING_CA)
1882 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1883 s.connect(self.server_addr)
1884 cert = s.getpeercert()
1885 self.assertTrue(cert)
1886
1887 def test_connect_with_context_fail(self):
1888 # This should fail because we have no verification certs. Connection
1889 # failure crashes ThreadedEchoServer, so run this in an independent
1890 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001891 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001892 ctx.verify_mode = ssl.CERT_REQUIRED
1893 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1894 self.addCleanup(s.close)
1895 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1896 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001897
1898 def test_connect_capath(self):
1899 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001900 # NOTE: the subject hashing algorithm has been changed between
1901 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1902 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001903 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001904 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001905 ctx.verify_mode = ssl.CERT_REQUIRED
1906 ctx.load_verify_locations(capath=CAPATH)
1907 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1908 s.connect(self.server_addr)
1909 cert = s.getpeercert()
1910 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001911
Martin Panter3840b2a2016-03-27 01:53:46 +00001912 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001913 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001914 ctx.verify_mode = ssl.CERT_REQUIRED
1915 ctx.load_verify_locations(capath=BYTES_CAPATH)
1916 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1917 s.connect(self.server_addr)
1918 cert = s.getpeercert()
1919 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001920
Christian Heimesefff7062013-11-21 03:35:02 +01001921 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001922 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001923 pem = f.read()
1924 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001925 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001926 ctx.verify_mode = ssl.CERT_REQUIRED
1927 ctx.load_verify_locations(cadata=pem)
1928 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1929 s.connect(self.server_addr)
1930 cert = s.getpeercert()
1931 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001932
Martin Panter3840b2a2016-03-27 01:53:46 +00001933 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001934 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001935 ctx.verify_mode = ssl.CERT_REQUIRED
1936 ctx.load_verify_locations(cadata=der)
1937 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1938 s.connect(self.server_addr)
1939 cert = s.getpeercert()
1940 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001941
Antoine Pitroue3220242010-04-24 11:13:53 +00001942 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1943 def test_makefile_close(self):
1944 # Issue #5238: creating a file-like object with makefile() shouldn't
1945 # delay closing the underlying "real socket" (here tested with its
1946 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001947 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001948 ss.connect(self.server_addr)
1949 fd = ss.fileno()
1950 f = ss.makefile()
1951 f.close()
1952 # The fd is still open
1953 os.read(fd, 0)
1954 # Closing the SSL socket should close the fd too
1955 ss.close()
1956 gc.collect()
1957 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001958 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001959 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001960
Antoine Pitrou480a1242010-04-28 21:37:09 +00001961 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001962 s = socket.socket(socket.AF_INET)
1963 s.connect(self.server_addr)
1964 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001965 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001966 cert_reqs=ssl.CERT_NONE,
1967 do_handshake_on_connect=False)
1968 self.addCleanup(s.close)
1969 count = 0
1970 while True:
1971 try:
1972 count += 1
1973 s.do_handshake()
1974 break
1975 except ssl.SSLWantReadError:
1976 select.select([s], [], [])
1977 except ssl.SSLWantWriteError:
1978 select.select([], [s], [])
1979 if support.verbose:
1980 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001981
Antoine Pitrou480a1242010-04-28 21:37:09 +00001982 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001983 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001984
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 def test_get_server_certificate_fail(self):
1986 # Connection failure crashes ThreadedEchoServer, so run this in an
1987 # independent test method
1988 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001989
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001990 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001991 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001992 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1993 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001994 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001995 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1996 s.connect(self.server_addr)
1997 # Error checking can happen at instantiation or when connecting
1998 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1999 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002000 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002001 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2002 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002003
Christian Heimes9a5395a2013-06-17 15:44:12 +02002004 def test_get_ca_certs_capath(self):
2005 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002006 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002007 ctx.load_verify_locations(capath=CAPATH)
2008 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002009 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2010 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002011 s.connect(self.server_addr)
2012 cert = s.getpeercert()
2013 self.assertTrue(cert)
2014 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002015
Christian Heimes575596e2013-12-15 21:49:17 +01002016 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002017 def test_context_setget(self):
2018 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002019 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2020 ctx1.load_verify_locations(capath=CAPATH)
2021 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2022 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002023 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002024 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002025 ss.connect(self.server_addr)
2026 self.assertIs(ss.context, ctx1)
2027 self.assertIs(ss._sslobj.context, ctx1)
2028 ss.context = ctx2
2029 self.assertIs(ss.context, ctx2)
2030 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002031
2032 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2033 # A simple IO loop. Call func(*args) depending on the error we get
2034 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2035 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002036 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002037 count = 0
2038 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002039 if time.monotonic() > deadline:
2040 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002041 errno = None
2042 count += 1
2043 try:
2044 ret = func(*args)
2045 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002046 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002047 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002048 raise
2049 errno = e.errno
2050 # Get any data from the outgoing BIO irrespective of any error, and
2051 # send it to the socket.
2052 buf = outgoing.read()
2053 sock.sendall(buf)
2054 # If there's no error, we're done. For WANT_READ, we need to get
2055 # data from the socket and put it in the incoming BIO.
2056 if errno is None:
2057 break
2058 elif errno == ssl.SSL_ERROR_WANT_READ:
2059 buf = sock.recv(32768)
2060 if buf:
2061 incoming.write(buf)
2062 else:
2063 incoming.write_eof()
2064 if support.verbose:
2065 sys.stdout.write("Needed %d calls to complete %s().\n"
2066 % (count, func.__name__))
2067 return ret
2068
Martin Panter3840b2a2016-03-27 01:53:46 +00002069 def test_bio_handshake(self):
2070 sock = socket.socket(socket.AF_INET)
2071 self.addCleanup(sock.close)
2072 sock.connect(self.server_addr)
2073 incoming = ssl.MemoryBIO()
2074 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002075 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2076 self.assertTrue(ctx.check_hostname)
2077 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002078 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002079 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2080 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002081 self.assertIs(sslobj._sslobj.owner, sslobj)
2082 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002083 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002084 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002085 self.assertRaises(ValueError, sslobj.getpeercert)
2086 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2087 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2088 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2089 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002090 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002091 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002092 self.assertTrue(sslobj.getpeercert())
2093 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2094 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2095 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002096 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002097 except ssl.SSLSyscallError:
2098 # If the server shuts down the TCP connection without sending a
2099 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2100 pass
2101 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2102
2103 def test_bio_read_write_data(self):
2104 sock = socket.socket(socket.AF_INET)
2105 self.addCleanup(sock.close)
2106 sock.connect(self.server_addr)
2107 incoming = ssl.MemoryBIO()
2108 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002109 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002110 ctx.verify_mode = ssl.CERT_NONE
2111 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2112 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2113 req = b'FOO\n'
2114 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2115 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2116 self.assertEqual(buf, b'foo\n')
2117 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002118
2119
Martin Panter3840b2a2016-03-27 01:53:46 +00002120class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002121
Martin Panter3840b2a2016-03-27 01:53:46 +00002122 def test_timeout_connect_ex(self):
2123 # Issue #12065: on a timeout, connect_ex() should return the original
2124 # errno (mimicking the behaviour of non-SSL sockets).
2125 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002126 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002127 cert_reqs=ssl.CERT_REQUIRED,
2128 do_handshake_on_connect=False)
2129 self.addCleanup(s.close)
2130 s.settimeout(0.0000001)
2131 rc = s.connect_ex((REMOTE_HOST, 443))
2132 if rc == 0:
2133 self.skipTest("REMOTE_HOST responded too quickly")
2134 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2135
2136 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2137 def test_get_server_certificate_ipv6(self):
2138 with support.transient_internet('ipv6.google.com'):
2139 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2140 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2141
Martin Panter3840b2a2016-03-27 01:53:46 +00002142
2143def _test_get_server_certificate(test, host, port, cert=None):
2144 pem = ssl.get_server_certificate((host, port))
2145 if not pem:
2146 test.fail("No server certificate on %s:%s!" % (host, port))
2147
2148 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2149 if not pem:
2150 test.fail("No server certificate on %s:%s!" % (host, port))
2151 if support.verbose:
2152 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2153
2154def _test_get_server_certificate_fail(test, host, port):
2155 try:
2156 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2157 except ssl.SSLError as x:
2158 #should fail
2159 if support.verbose:
2160 sys.stdout.write("%s\n" % x)
2161 else:
2162 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2163
2164
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002165from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002166
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002167class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002169 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002170
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002171 """A mildly complicated class, because we want it to work both
2172 with and without the SSL wrapper around the socket connection, so
2173 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002174
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002175 def __init__(self, server, connsock, addr):
2176 self.server = server
2177 self.running = False
2178 self.sock = connsock
2179 self.addr = addr
2180 self.sock.setblocking(1)
2181 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002182 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002183 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002184
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002185 def wrap_conn(self):
2186 try:
2187 self.sslconn = self.server.context.wrap_socket(
2188 self.sock, server_side=True)
2189 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2190 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Christian Heimes529525f2018-05-23 22:24:45 +02002191 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002192 # We treat ConnectionResetError as though it were an
2193 # SSLError - OpenSSL on Ubuntu abruptly closes the
2194 # connection when asked to use an unsupported protocol.
2195 #
Christian Heimes529525f2018-05-23 22:24:45 +02002196 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2197 # tries to send session tickets after handshake.
2198 # https://github.com/openssl/openssl/issues/6342
2199 self.server.conn_errors.append(str(e))
2200 if self.server.chatty:
2201 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2202 self.running = False
2203 self.close()
2204 return False
2205 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002206 # OSError may occur with wrong protocols, e.g. both
2207 # sides use PROTOCOL_TLS_SERVER.
2208 #
2209 # XXX Various errors can have happened here, for example
2210 # a mismatching protocol version, an invalid certificate,
2211 # or a low-level bug. This should be made more discriminating.
2212 #
2213 # bpo-31323: Store the exception as string to prevent
2214 # a reference leak: server -> conn_errors -> exception
2215 # -> traceback -> self (ConnectionHandler) -> server
2216 self.server.conn_errors.append(str(e))
2217 if self.server.chatty:
2218 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2219 self.running = False
2220 self.server.stop()
2221 self.close()
2222 return False
2223 else:
2224 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2225 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2226 cert = self.sslconn.getpeercert()
2227 if support.verbose and self.server.chatty:
2228 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2229 cert_binary = self.sslconn.getpeercert(True)
2230 if support.verbose and self.server.chatty:
2231 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2232 cipher = self.sslconn.cipher()
2233 if support.verbose and self.server.chatty:
2234 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2235 sys.stdout.write(" server: selected protocol is now "
2236 + str(self.sslconn.selected_npn_protocol()) + "\n")
2237 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002238
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002239 def read(self):
2240 if self.sslconn:
2241 return self.sslconn.read()
2242 else:
2243 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002244
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002245 def write(self, bytes):
2246 if self.sslconn:
2247 return self.sslconn.write(bytes)
2248 else:
2249 return self.sock.send(bytes)
2250
2251 def close(self):
2252 if self.sslconn:
2253 self.sslconn.close()
2254 else:
2255 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002256
Antoine Pitrou480a1242010-04-28 21:37:09 +00002257 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002258 self.running = True
2259 if not self.server.starttls_server:
2260 if not self.wrap_conn():
2261 return
2262 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002263 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002264 msg = self.read()
2265 stripped = msg.strip()
2266 if not stripped:
2267 # eof, so quit this handler
2268 self.running = False
2269 try:
2270 self.sock = self.sslconn.unwrap()
2271 except OSError:
2272 # Many tests shut the TCP connection down
2273 # without an SSL shutdown. This causes
2274 # unwrap() to raise OSError with errno=0!
2275 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002276 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002277 self.sslconn = None
2278 self.close()
2279 elif stripped == b'over':
2280 if support.verbose and self.server.connectionchatty:
2281 sys.stdout.write(" server: client closed connection\n")
2282 self.close()
2283 return
2284 elif (self.server.starttls_server and
2285 stripped == b'STARTTLS'):
2286 if support.verbose and self.server.connectionchatty:
2287 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2288 self.write(b"OK\n")
2289 if not self.wrap_conn():
2290 return
2291 elif (self.server.starttls_server and self.sslconn
2292 and stripped == b'ENDTLS'):
2293 if support.verbose and self.server.connectionchatty:
2294 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2295 self.write(b"OK\n")
2296 self.sock = self.sslconn.unwrap()
2297 self.sslconn = None
2298 if support.verbose and self.server.connectionchatty:
2299 sys.stdout.write(" server: connection is now unencrypted...\n")
2300 elif stripped == b'CB tls-unique':
2301 if support.verbose and self.server.connectionchatty:
2302 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2303 data = self.sslconn.get_channel_binding("tls-unique")
2304 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002305 elif stripped == b'PHA':
2306 if support.verbose and self.server.connectionchatty:
2307 sys.stdout.write(" server: initiating post handshake auth\n")
2308 try:
2309 self.sslconn.verify_client_post_handshake()
2310 except ssl.SSLError as e:
2311 self.write(repr(e).encode("us-ascii") + b"\n")
2312 else:
2313 self.write(b"OK\n")
2314 elif stripped == b'HASCERT':
2315 if self.sslconn.getpeercert() is not None:
2316 self.write(b'TRUE\n')
2317 else:
2318 self.write(b'FALSE\n')
2319 elif stripped == b'GETCERT':
2320 cert = self.sslconn.getpeercert()
2321 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002322 else:
2323 if (support.verbose and
2324 self.server.connectionchatty):
2325 ctype = (self.sslconn and "encrypted") or "unencrypted"
2326 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2327 % (msg, ctype, msg.lower(), ctype))
2328 self.write(msg.lower())
Christian Heimes529525f2018-05-23 22:24:45 +02002329 except ConnectionResetError:
2330 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2331 # when connection is not shut down gracefully.
2332 if self.server.chatty and support.verbose:
2333 sys.stdout.write(
2334 " Connection reset by peer: {}\n".format(
2335 self.addr)
2336 )
2337 self.close()
2338 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002339 except OSError:
2340 if self.server.chatty:
2341 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002342 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002343 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002344
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002345 # normally, we'd just stop here, but for the test
2346 # harness, we want to stop the server
2347 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002348
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002349 def __init__(self, certificate=None, ssl_version=None,
2350 certreqs=None, cacerts=None,
2351 chatty=True, connectionchatty=False, starttls_server=False,
2352 npn_protocols=None, alpn_protocols=None,
2353 ciphers=None, context=None):
2354 if context:
2355 self.context = context
2356 else:
2357 self.context = ssl.SSLContext(ssl_version
2358 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002359 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002360 self.context.verify_mode = (certreqs if certreqs is not None
2361 else ssl.CERT_NONE)
2362 if cacerts:
2363 self.context.load_verify_locations(cacerts)
2364 if certificate:
2365 self.context.load_cert_chain(certificate)
2366 if npn_protocols:
2367 self.context.set_npn_protocols(npn_protocols)
2368 if alpn_protocols:
2369 self.context.set_alpn_protocols(alpn_protocols)
2370 if ciphers:
2371 self.context.set_ciphers(ciphers)
2372 self.chatty = chatty
2373 self.connectionchatty = connectionchatty
2374 self.starttls_server = starttls_server
2375 self.sock = socket.socket()
2376 self.port = support.bind_port(self.sock)
2377 self.flag = None
2378 self.active = False
2379 self.selected_npn_protocols = []
2380 self.selected_alpn_protocols = []
2381 self.shared_ciphers = []
2382 self.conn_errors = []
2383 threading.Thread.__init__(self)
2384 self.daemon = True
2385
2386 def __enter__(self):
2387 self.start(threading.Event())
2388 self.flag.wait()
2389 return self
2390
2391 def __exit__(self, *args):
2392 self.stop()
2393 self.join()
2394
2395 def start(self, flag=None):
2396 self.flag = flag
2397 threading.Thread.start(self)
2398
2399 def run(self):
2400 self.sock.settimeout(0.05)
2401 self.sock.listen()
2402 self.active = True
2403 if self.flag:
2404 # signal an event
2405 self.flag.set()
2406 while self.active:
2407 try:
2408 newconn, connaddr = self.sock.accept()
2409 if support.verbose and self.chatty:
2410 sys.stdout.write(' server: new connection from '
2411 + repr(connaddr) + '\n')
2412 handler = self.ConnectionHandler(self, newconn, connaddr)
2413 handler.start()
2414 handler.join()
2415 except socket.timeout:
2416 pass
2417 except KeyboardInterrupt:
2418 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002419 except BaseException as e:
2420 if support.verbose and self.chatty:
2421 sys.stdout.write(
2422 ' connection handling failed: ' + repr(e) + '\n')
2423
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002424 self.sock.close()
2425
2426 def stop(self):
2427 self.active = False
2428
2429class AsyncoreEchoServer(threading.Thread):
2430
2431 # this one's based on asyncore.dispatcher
2432
2433 class EchoServer (asyncore.dispatcher):
2434
2435 class ConnectionHandler(asyncore.dispatcher_with_send):
2436
2437 def __init__(self, conn, certfile):
2438 self.socket = test_wrap_socket(conn, server_side=True,
2439 certfile=certfile,
2440 do_handshake_on_connect=False)
2441 asyncore.dispatcher_with_send.__init__(self, self.socket)
2442 self._ssl_accepting = True
2443 self._do_ssl_handshake()
2444
2445 def readable(self):
2446 if isinstance(self.socket, ssl.SSLSocket):
2447 while self.socket.pending() > 0:
2448 self.handle_read_event()
2449 return True
2450
2451 def _do_ssl_handshake(self):
2452 try:
2453 self.socket.do_handshake()
2454 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2455 return
2456 except ssl.SSLEOFError:
2457 return self.handle_close()
2458 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002459 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002460 except OSError as err:
2461 if err.args[0] == errno.ECONNABORTED:
2462 return self.handle_close()
2463 else:
2464 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002465
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002466 def handle_read(self):
2467 if self._ssl_accepting:
2468 self._do_ssl_handshake()
2469 else:
2470 data = self.recv(1024)
2471 if support.verbose:
2472 sys.stdout.write(" server: read %s from client\n" % repr(data))
2473 if not data:
2474 self.close()
2475 else:
2476 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002477
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002478 def handle_close(self):
2479 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002480 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002481 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002482
2483 def handle_error(self):
2484 raise
2485
Trent Nelson78520002008-04-10 20:54:35 +00002486 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002487 self.certfile = certfile
2488 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2489 self.port = support.bind_port(sock, '')
2490 asyncore.dispatcher.__init__(self, sock)
2491 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002492
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002493 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002494 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002495 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2496 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002497
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002498 def handle_error(self):
2499 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002500
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002501 def __init__(self, certfile):
2502 self.flag = None
2503 self.active = False
2504 self.server = self.EchoServer(certfile)
2505 self.port = self.server.port
2506 threading.Thread.__init__(self)
2507 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002508
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002509 def __str__(self):
2510 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002511
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002512 def __enter__(self):
2513 self.start(threading.Event())
2514 self.flag.wait()
2515 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002516
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002517 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002518 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002519 sys.stdout.write(" cleanup: stopping server.\n")
2520 self.stop()
2521 if support.verbose:
2522 sys.stdout.write(" cleanup: joining server thread.\n")
2523 self.join()
2524 if support.verbose:
2525 sys.stdout.write(" cleanup: successfully joined.\n")
2526 # make sure that ConnectionHandler is removed from socket_map
2527 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002528
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 def start (self, flag=None):
2530 self.flag = flag
2531 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002532
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002533 def run(self):
2534 self.active = True
2535 if self.flag:
2536 self.flag.set()
2537 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002538 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002539 asyncore.loop(1)
2540 except:
2541 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002542
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002543 def stop(self):
2544 self.active = False
2545 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002546
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002547def server_params_test(client_context, server_context, indata=b"FOO\n",
2548 chatty=True, connectionchatty=False, sni_name=None,
2549 session=None):
2550 """
2551 Launch a server, connect a client to it and try various reads
2552 and writes.
2553 """
2554 stats = {}
2555 server = ThreadedEchoServer(context=server_context,
2556 chatty=chatty,
2557 connectionchatty=False)
2558 with server:
2559 with client_context.wrap_socket(socket.socket(),
2560 server_hostname=sni_name, session=session) as s:
2561 s.connect((HOST, server.port))
2562 for arg in [indata, bytearray(indata), memoryview(indata)]:
2563 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002564 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002565 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002566 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002567 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002568 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002569 if connectionchatty:
2570 if support.verbose:
2571 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002572 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002573 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002574 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2575 % (outdata[:20], len(outdata),
2576 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002577 s.write(b"over\n")
2578 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002579 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002580 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002581 stats.update({
2582 'compression': s.compression(),
2583 'cipher': s.cipher(),
2584 'peercert': s.getpeercert(),
2585 'client_alpn_protocol': s.selected_alpn_protocol(),
2586 'client_npn_protocol': s.selected_npn_protocol(),
2587 'version': s.version(),
2588 'session_reused': s.session_reused,
2589 'session': s.session,
2590 })
2591 s.close()
2592 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2593 stats['server_npn_protocols'] = server.selected_npn_protocols
2594 stats['server_shared_ciphers'] = server.shared_ciphers
2595 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002596
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002597def try_protocol_combo(server_protocol, client_protocol, expect_success,
2598 certsreqs=None, server_options=0, client_options=0):
2599 """
2600 Try to SSL-connect using *client_protocol* to *server_protocol*.
2601 If *expect_success* is true, assert that the connection succeeds,
2602 if it's false, assert that the connection fails.
2603 Also, if *expect_success* is a string, assert that it is the protocol
2604 version actually used by the connection.
2605 """
2606 if certsreqs is None:
2607 certsreqs = ssl.CERT_NONE
2608 certtype = {
2609 ssl.CERT_NONE: "CERT_NONE",
2610 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2611 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2612 }[certsreqs]
2613 if support.verbose:
2614 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2615 sys.stdout.write(formatstr %
2616 (ssl.get_protocol_name(client_protocol),
2617 ssl.get_protocol_name(server_protocol),
2618 certtype))
2619 client_context = ssl.SSLContext(client_protocol)
2620 client_context.options |= client_options
2621 server_context = ssl.SSLContext(server_protocol)
2622 server_context.options |= server_options
2623
Victor Stinner3ef63442019-02-19 18:06:03 +01002624 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2625 if (min_version is not None
2626 # SSLContext.minimum_version is only available on recent OpenSSL
2627 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2628 and hasattr(server_context, 'minimum_version')
2629 and server_protocol == ssl.PROTOCOL_TLS
2630 and server_context.minimum_version > min_version):
2631 # If OpenSSL configuration is strict and requires more recent TLS
2632 # version, we have to change the minimum to test old TLS versions.
2633 server_context.minimum_version = min_version
2634
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002635 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2636 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2637 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002638 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002639 client_context.set_ciphers("ALL")
2640
2641 for ctx in (client_context, server_context):
2642 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002643 ctx.load_cert_chain(SIGNED_CERTFILE)
2644 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002645 try:
2646 stats = server_params_test(client_context, server_context,
2647 chatty=False, connectionchatty=False)
2648 # Protocol mismatch can result in either an SSLError, or a
2649 # "Connection reset by peer" error.
2650 except ssl.SSLError:
2651 if expect_success:
2652 raise
2653 except OSError as e:
2654 if expect_success or e.errno != errno.ECONNRESET:
2655 raise
2656 else:
2657 if not expect_success:
2658 raise AssertionError(
2659 "Client protocol %s succeeded with server protocol %s!"
2660 % (ssl.get_protocol_name(client_protocol),
2661 ssl.get_protocol_name(server_protocol)))
2662 elif (expect_success is not True
2663 and expect_success != stats['version']):
2664 raise AssertionError("version mismatch: expected %r, got %r"
2665 % (expect_success, stats['version']))
2666
2667
2668class ThreadedTests(unittest.TestCase):
2669
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002670 def test_echo(self):
2671 """Basic test of an SSL client connecting to a server"""
2672 if support.verbose:
2673 sys.stdout.write("\n")
2674 for protocol in PROTOCOLS:
2675 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2676 continue
2677 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2678 context = ssl.SSLContext(protocol)
2679 context.load_cert_chain(CERTFILE)
2680 server_params_test(context, context,
2681 chatty=True, connectionchatty=True)
2682
Christian Heimesa170fa12017-09-15 20:27:30 +02002683 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002684
2685 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2686 server_params_test(client_context=client_context,
2687 server_context=server_context,
2688 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002689 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002690
2691 client_context.check_hostname = False
2692 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2693 with self.assertRaises(ssl.SSLError) as e:
2694 server_params_test(client_context=server_context,
2695 server_context=client_context,
2696 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002697 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002698 self.assertIn('called a function you should not call',
2699 str(e.exception))
2700
2701 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2702 with self.assertRaises(ssl.SSLError) as e:
2703 server_params_test(client_context=server_context,
2704 server_context=server_context,
2705 chatty=True, connectionchatty=True)
2706 self.assertIn('called a function you should not call',
2707 str(e.exception))
2708
2709 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2710 with self.assertRaises(ssl.SSLError) as e:
2711 server_params_test(client_context=server_context,
2712 server_context=client_context,
2713 chatty=True, connectionchatty=True)
2714 self.assertIn('called a function you should not call',
2715 str(e.exception))
2716
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002717 def test_getpeercert(self):
2718 if support.verbose:
2719 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002720
2721 client_context, server_context, hostname = testing_context()
2722 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002723 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002724 with client_context.wrap_socket(socket.socket(),
2725 do_handshake_on_connect=False,
2726 server_hostname=hostname) as s:
2727 s.connect((HOST, server.port))
2728 # getpeercert() raise ValueError while the handshake isn't
2729 # done.
2730 with self.assertRaises(ValueError):
2731 s.getpeercert()
2732 s.do_handshake()
2733 cert = s.getpeercert()
2734 self.assertTrue(cert, "Can't get peer certificate.")
2735 cipher = s.cipher()
2736 if support.verbose:
2737 sys.stdout.write(pprint.pformat(cert) + '\n')
2738 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2739 if 'subject' not in cert:
2740 self.fail("No subject field in certificate: %s." %
2741 pprint.pformat(cert))
2742 if ((('organizationName', 'Python Software Foundation'),)
2743 not in cert['subject']):
2744 self.fail(
2745 "Missing or invalid 'organizationName' field in certificate subject; "
2746 "should be 'Python Software Foundation'.")
2747 self.assertIn('notBefore', cert)
2748 self.assertIn('notAfter', cert)
2749 before = ssl.cert_time_to_seconds(cert['notBefore'])
2750 after = ssl.cert_time_to_seconds(cert['notAfter'])
2751 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002752
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002753 @unittest.skipUnless(have_verify_flags(),
2754 "verify_flags need OpenSSL > 0.9.8")
2755 def test_crl_check(self):
2756 if support.verbose:
2757 sys.stdout.write("\n")
2758
Christian Heimesa170fa12017-09-15 20:27:30 +02002759 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002760
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002761 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002762 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002763
2764 # VERIFY_DEFAULT should pass
2765 server = ThreadedEchoServer(context=server_context, chatty=True)
2766 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002767 with client_context.wrap_socket(socket.socket(),
2768 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002769 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002770 cert = s.getpeercert()
2771 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002772
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002773 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002774 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002775
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002776 server = ThreadedEchoServer(context=server_context, chatty=True)
2777 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002778 with client_context.wrap_socket(socket.socket(),
2779 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780 with self.assertRaisesRegex(ssl.SSLError,
2781 "certificate verify failed"):
2782 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002783
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002784 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002785 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002786
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002787 server = ThreadedEchoServer(context=server_context, chatty=True)
2788 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002789 with client_context.wrap_socket(socket.socket(),
2790 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002791 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002792 cert = s.getpeercert()
2793 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002794
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002795 def test_check_hostname(self):
2796 if support.verbose:
2797 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002798
Christian Heimesa170fa12017-09-15 20:27:30 +02002799 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002800
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002801 # correct hostname should verify
2802 server = ThreadedEchoServer(context=server_context, chatty=True)
2803 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002804 with client_context.wrap_socket(socket.socket(),
2805 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002806 s.connect((HOST, server.port))
2807 cert = s.getpeercert()
2808 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002809
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002810 # incorrect hostname should raise an exception
2811 server = ThreadedEchoServer(context=server_context, chatty=True)
2812 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002813 with client_context.wrap_socket(socket.socket(),
2814 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002815 with self.assertRaisesRegex(
2816 ssl.CertificateError,
2817 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002818 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002819
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002820 # missing server_hostname arg should cause an exception, too
2821 server = ThreadedEchoServer(context=server_context, chatty=True)
2822 with server:
2823 with socket.socket() as s:
2824 with self.assertRaisesRegex(ValueError,
2825 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002826 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002827
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002828 def test_ecc_cert(self):
2829 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2830 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002831 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002832 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2833
2834 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2835 # load ECC cert
2836 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2837
2838 # correct hostname should verify
2839 server = ThreadedEchoServer(context=server_context, chatty=True)
2840 with server:
2841 with client_context.wrap_socket(socket.socket(),
2842 server_hostname=hostname) as s:
2843 s.connect((HOST, server.port))
2844 cert = s.getpeercert()
2845 self.assertTrue(cert, "Can't get peer certificate.")
2846 cipher = s.cipher()[0].split('-')
2847 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2848
2849 def test_dual_rsa_ecc(self):
2850 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2851 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002852 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2853 # algorithms.
2854 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002855 # only ECDSA certs
2856 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2857 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2858
2859 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2860 # load ECC and RSA key/cert pairs
2861 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2862 server_context.load_cert_chain(SIGNED_CERTFILE)
2863
2864 # correct hostname should verify
2865 server = ThreadedEchoServer(context=server_context, chatty=True)
2866 with server:
2867 with client_context.wrap_socket(socket.socket(),
2868 server_hostname=hostname) as s:
2869 s.connect((HOST, server.port))
2870 cert = s.getpeercert()
2871 self.assertTrue(cert, "Can't get peer certificate.")
2872 cipher = s.cipher()[0].split('-')
2873 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2874
Christian Heimes66e57422018-01-29 14:25:13 +01002875 def test_check_hostname_idn(self):
2876 if support.verbose:
2877 sys.stdout.write("\n")
2878
Christian Heimes11a14932018-02-24 02:35:08 +01002879 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002880 server_context.load_cert_chain(IDNSANSFILE)
2881
Christian Heimes11a14932018-02-24 02:35:08 +01002882 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002883 context.verify_mode = ssl.CERT_REQUIRED
2884 context.check_hostname = True
2885 context.load_verify_locations(SIGNING_CA)
2886
2887 # correct hostname should verify, when specified in several
2888 # different ways
2889 idn_hostnames = [
2890 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002891 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002892 ('xn--knig-5qa.idn.pythontest.net',
2893 'xn--knig-5qa.idn.pythontest.net'),
2894 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002895 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002896
2897 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002898 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002899 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2900 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2901 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002902 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2903
2904 # ('königsgäßchen.idna2008.pythontest.net',
2905 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2906 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2907 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2908 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2909 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2910
Christian Heimes66e57422018-01-29 14:25:13 +01002911 ]
2912 for server_hostname, expected_hostname in idn_hostnames:
2913 server = ThreadedEchoServer(context=server_context, chatty=True)
2914 with server:
2915 with context.wrap_socket(socket.socket(),
2916 server_hostname=server_hostname) as s:
2917 self.assertEqual(s.server_hostname, expected_hostname)
2918 s.connect((HOST, server.port))
2919 cert = s.getpeercert()
2920 self.assertEqual(s.server_hostname, expected_hostname)
2921 self.assertTrue(cert, "Can't get peer certificate.")
2922
Christian Heimes66e57422018-01-29 14:25:13 +01002923 # incorrect hostname should raise an exception
2924 server = ThreadedEchoServer(context=server_context, chatty=True)
2925 with server:
2926 with context.wrap_socket(socket.socket(),
2927 server_hostname="python.example.org") as s:
2928 with self.assertRaises(ssl.CertificateError):
2929 s.connect((HOST, server.port))
2930
Christian Heimes529525f2018-05-23 22:24:45 +02002931 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002932 """Connecting when the server rejects the client's certificate
2933
2934 Launch a server with CERT_REQUIRED, and check that trying to
2935 connect to it with a wrong client certificate fails.
2936 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002937 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002938 # load client cert that is not signed by trusted CA
2939 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002940 # require TLS client authentication
2941 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002942 # TLS 1.3 has different handshake
2943 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002944
2945 server = ThreadedEchoServer(
2946 context=server_context, chatty=True, connectionchatty=True,
2947 )
2948
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002949 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002950 client_context.wrap_socket(socket.socket(),
2951 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002952 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002953 # Expect either an SSL error about the server rejecting
2954 # the connection, or a low-level connection reset (which
2955 # sometimes happens on Windows)
2956 s.connect((HOST, server.port))
2957 except ssl.SSLError as e:
2958 if support.verbose:
2959 sys.stdout.write("\nSSLError is %r\n" % e)
2960 except OSError as e:
2961 if e.errno != errno.ECONNRESET:
2962 raise
2963 if support.verbose:
2964 sys.stdout.write("\nsocket.error is %r\n" % e)
2965 else:
2966 self.fail("Use of invalid cert should have failed!")
2967
Christian Heimes529525f2018-05-23 22:24:45 +02002968 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2969 def test_wrong_cert_tls13(self):
2970 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002971 # load client cert that is not signed by trusted CA
2972 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002973 server_context.verify_mode = ssl.CERT_REQUIRED
2974 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2975 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2976
2977 server = ThreadedEchoServer(
2978 context=server_context, chatty=True, connectionchatty=True,
2979 )
2980 with server, \
2981 client_context.wrap_socket(socket.socket(),
2982 server_hostname=hostname) as s:
2983 # TLS 1.3 perform client cert exchange after handshake
2984 s.connect((HOST, server.port))
2985 try:
2986 s.write(b'data')
2987 s.read(4)
2988 except ssl.SSLError as e:
2989 if support.verbose:
2990 sys.stdout.write("\nSSLError is %r\n" % e)
2991 except OSError as e:
2992 if e.errno != errno.ECONNRESET:
2993 raise
2994 if support.verbose:
2995 sys.stdout.write("\nsocket.error is %r\n" % e)
2996 else:
2997 self.fail("Use of invalid cert should have failed!")
2998
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002999 def test_rude_shutdown(self):
3000 """A brutal shutdown of an SSL server should raise an OSError
3001 in the client when attempting handshake.
3002 """
3003 listener_ready = threading.Event()
3004 listener_gone = threading.Event()
3005
3006 s = socket.socket()
3007 port = support.bind_port(s, HOST)
3008
3009 # `listener` runs in a thread. It sits in an accept() until
3010 # the main thread connects. Then it rudely closes the socket,
3011 # and sets Event `listener_gone` to let the main thread know
3012 # the socket is gone.
3013 def listener():
3014 s.listen()
3015 listener_ready.set()
3016 newsock, addr = s.accept()
3017 newsock.close()
3018 s.close()
3019 listener_gone.set()
3020
3021 def connector():
3022 listener_ready.wait()
3023 with socket.socket() as c:
3024 c.connect((HOST, port))
3025 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003026 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003027 ssl_sock = test_wrap_socket(c)
3028 except OSError:
3029 pass
3030 else:
3031 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003033 t = threading.Thread(target=listener)
3034 t.start()
3035 try:
3036 connector()
3037 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003038 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003039
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003040 def test_ssl_cert_verify_error(self):
3041 if support.verbose:
3042 sys.stdout.write("\n")
3043
3044 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3045 server_context.load_cert_chain(SIGNED_CERTFILE)
3046
3047 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3048
3049 server = ThreadedEchoServer(context=server_context, chatty=True)
3050 with server:
3051 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003052 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003053 try:
3054 s.connect((HOST, server.port))
3055 except ssl.SSLError as e:
3056 msg = 'unable to get local issuer certificate'
3057 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3058 self.assertEqual(e.verify_code, 20)
3059 self.assertEqual(e.verify_message, msg)
3060 self.assertIn(msg, repr(e))
3061 self.assertIn('certificate verify failed', repr(e))
3062
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003063 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3064 "OpenSSL is compiled without SSLv2 support")
3065 def test_protocol_sslv2(self):
3066 """Connecting to an SSLv2 server with various client options"""
3067 if support.verbose:
3068 sys.stdout.write("\n")
3069 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3070 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3071 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003072 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003073 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3074 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3075 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3076 # SSLv23 client with specific SSL options
3077 if no_sslv2_implies_sslv3_hello():
3078 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003079 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003080 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003081 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003082 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003083 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003084 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003085
Christian Heimesa170fa12017-09-15 20:27:30 +02003086 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003087 """Connecting to an SSLv23 server with various client options"""
3088 if support.verbose:
3089 sys.stdout.write("\n")
3090 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003091 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003092 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 except OSError as x:
3094 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3095 if support.verbose:
3096 sys.stdout.write(
3097 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3098 % str(x))
3099 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003100 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3101 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3102 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003103
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003104 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003105 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3106 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3107 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003108
3109 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003110 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3111 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3112 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003113
3114 # Server with specific SSL options
3115 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003116 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003117 server_options=ssl.OP_NO_SSLv3)
3118 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003119 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003120 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003121 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003122 server_options=ssl.OP_NO_TLSv1)
3123
3124
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003125 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3126 "OpenSSL is compiled without SSLv3 support")
3127 def test_protocol_sslv3(self):
3128 """Connecting to an SSLv3 server with various client options"""
3129 if support.verbose:
3130 sys.stdout.write("\n")
3131 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3132 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3133 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3134 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3135 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003136 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003137 client_options=ssl.OP_NO_SSLv3)
3138 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3139 if no_sslv2_implies_sslv3_hello():
3140 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003141 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003142 False, client_options=ssl.OP_NO_SSLv2)
3143
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003144 def test_protocol_tlsv1(self):
3145 """Connecting to a TLSv1 server with various client options"""
3146 if support.verbose:
3147 sys.stdout.write("\n")
3148 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3149 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3150 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3151 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3152 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3153 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3154 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003155 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003156 client_options=ssl.OP_NO_TLSv1)
3157
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003158 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3159 "TLS version 1.1 not supported.")
3160 def test_protocol_tlsv1_1(self):
3161 """Connecting to a TLSv1.1 server with various client options.
3162 Testing against older TLS versions."""
3163 if support.verbose:
3164 sys.stdout.write("\n")
3165 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3166 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3167 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3168 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3169 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003170 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003171 client_options=ssl.OP_NO_TLSv1_1)
3172
Christian Heimesa170fa12017-09-15 20:27:30 +02003173 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003174 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3175 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3176
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003177 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3178 "TLS version 1.2 not supported.")
3179 def test_protocol_tlsv1_2(self):
3180 """Connecting to a TLSv1.2 server with various client options.
3181 Testing against older TLS versions."""
3182 if support.verbose:
3183 sys.stdout.write("\n")
3184 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3185 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3186 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3187 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3188 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3189 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3190 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003191 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003192 client_options=ssl.OP_NO_TLSv1_2)
3193
Christian Heimesa170fa12017-09-15 20:27:30 +02003194 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003195 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3196 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3197 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3198 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3199
3200 def test_starttls(self):
3201 """Switching from clear text to encrypted and back again."""
3202 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3203
3204 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003205 starttls_server=True,
3206 chatty=True,
3207 connectionchatty=True)
3208 wrapped = False
3209 with server:
3210 s = socket.socket()
3211 s.setblocking(1)
3212 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003213 if support.verbose:
3214 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003215 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003216 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003217 sys.stdout.write(
3218 " client: sending %r...\n" % indata)
3219 if wrapped:
3220 conn.write(indata)
3221 outdata = conn.read()
3222 else:
3223 s.send(indata)
3224 outdata = s.recv(1024)
3225 msg = outdata.strip().lower()
3226 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3227 # STARTTLS ok, switch to secure mode
3228 if support.verbose:
3229 sys.stdout.write(
3230 " client: read %r from server, starting TLS...\n"
3231 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003232 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003233 wrapped = True
3234 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3235 # ENDTLS ok, switch back to clear text
3236 if support.verbose:
3237 sys.stdout.write(
3238 " client: read %r from server, ending TLS...\n"
3239 % msg)
3240 s = conn.unwrap()
3241 wrapped = False
3242 else:
3243 if support.verbose:
3244 sys.stdout.write(
3245 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003246 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003247 sys.stdout.write(" client: closing connection.\n")
3248 if wrapped:
3249 conn.write(b"over\n")
3250 else:
3251 s.send(b"over\n")
3252 if wrapped:
3253 conn.close()
3254 else:
3255 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003256
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003257 def test_socketserver(self):
3258 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003259 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003260 # try to connect
3261 if support.verbose:
3262 sys.stdout.write('\n')
3263 with open(CERTFILE, 'rb') as f:
3264 d1 = f.read()
3265 d2 = ''
3266 # now fetch the same data from the HTTPS server
3267 url = 'https://localhost:%d/%s' % (
3268 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003269 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003270 f = urllib.request.urlopen(url, context=context)
3271 try:
3272 dlen = f.info().get("content-length")
3273 if dlen and (int(dlen) > 0):
3274 d2 = f.read(int(dlen))
3275 if support.verbose:
3276 sys.stdout.write(
3277 " client: read %d bytes from remote server '%s'\n"
3278 % (len(d2), server))
3279 finally:
3280 f.close()
3281 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003282
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003283 def test_asyncore_server(self):
3284 """Check the example asyncore integration."""
3285 if support.verbose:
3286 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003287
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003288 indata = b"FOO\n"
3289 server = AsyncoreEchoServer(CERTFILE)
3290 with server:
3291 s = test_wrap_socket(socket.socket())
3292 s.connect(('127.0.0.1', server.port))
3293 if support.verbose:
3294 sys.stdout.write(
3295 " client: sending %r...\n" % indata)
3296 s.write(indata)
3297 outdata = s.read()
3298 if support.verbose:
3299 sys.stdout.write(" client: read %r\n" % outdata)
3300 if outdata != indata.lower():
3301 self.fail(
3302 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3303 % (outdata[:20], len(outdata),
3304 indata[:20].lower(), len(indata)))
3305 s.write(b"over\n")
3306 if support.verbose:
3307 sys.stdout.write(" client: closing connection.\n")
3308 s.close()
3309 if support.verbose:
3310 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003311
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003312 def test_recv_send(self):
3313 """Test recv(), send() and friends."""
3314 if support.verbose:
3315 sys.stdout.write("\n")
3316
3317 server = ThreadedEchoServer(CERTFILE,
3318 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003319 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003320 cacerts=CERTFILE,
3321 chatty=True,
3322 connectionchatty=False)
3323 with server:
3324 s = test_wrap_socket(socket.socket(),
3325 server_side=False,
3326 certfile=CERTFILE,
3327 ca_certs=CERTFILE,
3328 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003329 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003330 s.connect((HOST, server.port))
3331 # helper methods for standardising recv* method signatures
3332 def _recv_into():
3333 b = bytearray(b"\0"*100)
3334 count = s.recv_into(b)
3335 return b[:count]
3336
3337 def _recvfrom_into():
3338 b = bytearray(b"\0"*100)
3339 count, addr = s.recvfrom_into(b)
3340 return b[:count]
3341
3342 # (name, method, expect success?, *args, return value func)
3343 send_methods = [
3344 ('send', s.send, True, [], len),
3345 ('sendto', s.sendto, False, ["some.address"], len),
3346 ('sendall', s.sendall, True, [], lambda x: None),
3347 ]
3348 # (name, method, whether to expect success, *args)
3349 recv_methods = [
3350 ('recv', s.recv, True, []),
3351 ('recvfrom', s.recvfrom, False, ["some.address"]),
3352 ('recv_into', _recv_into, True, []),
3353 ('recvfrom_into', _recvfrom_into, False, []),
3354 ]
3355 data_prefix = "PREFIX_"
3356
3357 for (meth_name, send_meth, expect_success, args,
3358 ret_val_meth) in send_methods:
3359 indata = (data_prefix + meth_name).encode('ascii')
3360 try:
3361 ret = send_meth(indata, *args)
3362 msg = "sending with {}".format(meth_name)
3363 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3364 outdata = s.read()
3365 if outdata != indata.lower():
3366 self.fail(
3367 "While sending with <<{name:s}>> bad data "
3368 "<<{outdata:r}>> ({nout:d}) received; "
3369 "expected <<{indata:r}>> ({nin:d})\n".format(
3370 name=meth_name, outdata=outdata[:20],
3371 nout=len(outdata),
3372 indata=indata[:20], nin=len(indata)
3373 )
3374 )
3375 except ValueError as e:
3376 if expect_success:
3377 self.fail(
3378 "Failed to send with method <<{name:s}>>; "
3379 "expected to succeed.\n".format(name=meth_name)
3380 )
3381 if not str(e).startswith(meth_name):
3382 self.fail(
3383 "Method <<{name:s}>> failed with unexpected "
3384 "exception message: {exp:s}\n".format(
3385 name=meth_name, exp=e
3386 )
3387 )
3388
3389 for meth_name, recv_meth, expect_success, args in recv_methods:
3390 indata = (data_prefix + meth_name).encode('ascii')
3391 try:
3392 s.send(indata)
3393 outdata = recv_meth(*args)
3394 if outdata != indata.lower():
3395 self.fail(
3396 "While receiving with <<{name:s}>> bad data "
3397 "<<{outdata:r}>> ({nout:d}) received; "
3398 "expected <<{indata:r}>> ({nin:d})\n".format(
3399 name=meth_name, outdata=outdata[:20],
3400 nout=len(outdata),
3401 indata=indata[:20], nin=len(indata)
3402 )
3403 )
3404 except ValueError as e:
3405 if expect_success:
3406 self.fail(
3407 "Failed to receive with method <<{name:s}>>; "
3408 "expected to succeed.\n".format(name=meth_name)
3409 )
3410 if not str(e).startswith(meth_name):
3411 self.fail(
3412 "Method <<{name:s}>> failed with unexpected "
3413 "exception message: {exp:s}\n".format(
3414 name=meth_name, exp=e
3415 )
3416 )
3417 # consume data
3418 s.read()
3419
3420 # read(-1, buffer) is supported, even though read(-1) is not
3421 data = b"data"
3422 s.send(data)
3423 buffer = bytearray(len(data))
3424 self.assertEqual(s.read(-1, buffer), len(data))
3425 self.assertEqual(buffer, data)
3426
Christian Heimes888bbdc2017-09-07 14:18:21 -07003427 # sendall accepts bytes-like objects
3428 if ctypes is not None:
3429 ubyte = ctypes.c_ubyte * len(data)
3430 byteslike = ubyte.from_buffer_copy(data)
3431 s.sendall(byteslike)
3432 self.assertEqual(s.read(), data)
3433
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003434 # Make sure sendmsg et al are disallowed to avoid
3435 # inadvertent disclosure of data and/or corruption
3436 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003437 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003438 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3439 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3440 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003441 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003442 s.write(b"over\n")
3443
3444 self.assertRaises(ValueError, s.recv, -1)
3445 self.assertRaises(ValueError, s.read, -1)
3446
3447 s.close()
3448
3449 def test_recv_zero(self):
3450 server = ThreadedEchoServer(CERTFILE)
3451 server.__enter__()
3452 self.addCleanup(server.__exit__, None, None)
3453 s = socket.create_connection((HOST, server.port))
3454 self.addCleanup(s.close)
3455 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3456 self.addCleanup(s.close)
3457
3458 # recv/read(0) should return no data
3459 s.send(b"data")
3460 self.assertEqual(s.recv(0), b"")
3461 self.assertEqual(s.read(0), b"")
3462 self.assertEqual(s.read(), b"data")
3463
3464 # Should not block if the other end sends no data
3465 s.setblocking(False)
3466 self.assertEqual(s.recv(0), b"")
3467 self.assertEqual(s.recv_into(bytearray()), 0)
3468
3469 def test_nonblocking_send(self):
3470 server = ThreadedEchoServer(CERTFILE,
3471 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003472 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003473 cacerts=CERTFILE,
3474 chatty=True,
3475 connectionchatty=False)
3476 with server:
3477 s = test_wrap_socket(socket.socket(),
3478 server_side=False,
3479 certfile=CERTFILE,
3480 ca_certs=CERTFILE,
3481 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003482 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003483 s.connect((HOST, server.port))
3484 s.setblocking(False)
3485
3486 # If we keep sending data, at some point the buffers
3487 # will be full and the call will block
3488 buf = bytearray(8192)
3489 def fill_buffer():
3490 while True:
3491 s.send(buf)
3492 self.assertRaises((ssl.SSLWantWriteError,
3493 ssl.SSLWantReadError), fill_buffer)
3494
3495 # Now read all the output and discard it
3496 s.setblocking(True)
3497 s.close()
3498
3499 def test_handshake_timeout(self):
3500 # Issue #5103: SSL handshake must respect the socket timeout
3501 server = socket.socket(socket.AF_INET)
3502 host = "127.0.0.1"
3503 port = support.bind_port(server)
3504 started = threading.Event()
3505 finish = False
3506
3507 def serve():
3508 server.listen()
3509 started.set()
3510 conns = []
3511 while not finish:
3512 r, w, e = select.select([server], [], [], 0.1)
3513 if server in r:
3514 # Let the socket hang around rather than having
3515 # it closed by garbage collection.
3516 conns.append(server.accept()[0])
3517 for sock in conns:
3518 sock.close()
3519
3520 t = threading.Thread(target=serve)
3521 t.start()
3522 started.wait()
3523
3524 try:
3525 try:
3526 c = socket.socket(socket.AF_INET)
3527 c.settimeout(0.2)
3528 c.connect((host, port))
3529 # Will attempt handshake and time out
3530 self.assertRaisesRegex(socket.timeout, "timed out",
3531 test_wrap_socket, c)
3532 finally:
3533 c.close()
3534 try:
3535 c = socket.socket(socket.AF_INET)
3536 c = test_wrap_socket(c)
3537 c.settimeout(0.2)
3538 # Will attempt handshake and time out
3539 self.assertRaisesRegex(socket.timeout, "timed out",
3540 c.connect, (host, port))
3541 finally:
3542 c.close()
3543 finally:
3544 finish = True
3545 t.join()
3546 server.close()
3547
3548 def test_server_accept(self):
3549 # Issue #16357: accept() on a SSLSocket created through
3550 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003551 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003552 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003553 context.load_verify_locations(SIGNING_CA)
3554 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003555 server = socket.socket(socket.AF_INET)
3556 host = "127.0.0.1"
3557 port = support.bind_port(server)
3558 server = context.wrap_socket(server, server_side=True)
3559 self.assertTrue(server.server_side)
3560
3561 evt = threading.Event()
3562 remote = None
3563 peer = None
3564 def serve():
3565 nonlocal remote, peer
3566 server.listen()
3567 # Block on the accept and wait on the connection to close.
3568 evt.set()
3569 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003570 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003571
3572 t = threading.Thread(target=serve)
3573 t.start()
3574 # Client wait until server setup and perform a connect.
3575 evt.wait()
3576 client = context.wrap_socket(socket.socket())
3577 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003578 client.send(b'data')
3579 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003580 client_addr = client.getsockname()
3581 client.close()
3582 t.join()
3583 remote.close()
3584 server.close()
3585 # Sanity checks.
3586 self.assertIsInstance(remote, ssl.SSLSocket)
3587 self.assertEqual(peer, client_addr)
3588
3589 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003590 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003591 with context.wrap_socket(socket.socket()) as sock:
3592 with self.assertRaises(OSError) as cm:
3593 sock.getpeercert()
3594 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3595
3596 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003597 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003598 with context.wrap_socket(socket.socket()) as sock:
3599 with self.assertRaises(OSError) as cm:
3600 sock.do_handshake()
3601 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3602
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003603 def test_no_shared_ciphers(self):
3604 client_context, server_context, hostname = testing_context()
3605 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3606 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003607 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003608 client_context.set_ciphers("AES128")
3609 server_context.set_ciphers("AES256")
3610 with ThreadedEchoServer(context=server_context) as server:
3611 with client_context.wrap_socket(socket.socket(),
3612 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003613 with self.assertRaises(OSError):
3614 s.connect((HOST, server.port))
3615 self.assertIn("no shared cipher", server.conn_errors[0])
3616
3617 def test_version_basic(self):
3618 """
3619 Basic tests for SSLSocket.version().
3620 More tests are done in the test_protocol_*() methods.
3621 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003622 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3623 context.check_hostname = False
3624 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003625 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003626 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003627 chatty=False) as server:
3628 with context.wrap_socket(socket.socket()) as s:
3629 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003630 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003631 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003632 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003633 self.assertEqual(s.version(), 'TLSv1.3')
3634 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003635 self.assertEqual(s.version(), 'TLSv1.2')
3636 else: # 0.9.8 to 1.0.1
3637 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003638 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003639 self.assertIs(s.version(), None)
3640
Christian Heimescb5b68a2017-09-07 18:07:00 -07003641 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3642 "test requires TLSv1.3 enabled OpenSSL")
3643 def test_tls1_3(self):
3644 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3645 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003646 context.options |= (
3647 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3648 )
3649 with ThreadedEchoServer(context=context) as server:
3650 with context.wrap_socket(socket.socket()) as s:
3651 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003652 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003653 'TLS_AES_256_GCM_SHA384',
3654 'TLS_CHACHA20_POLY1305_SHA256',
3655 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003656 })
3657 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003658
Christian Heimes698dde12018-02-27 11:54:43 +01003659 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3660 "required OpenSSL 1.1.0g")
3661 def test_min_max_version(self):
3662 client_context, server_context, hostname = testing_context()
3663 # client TLSv1.0 to 1.2
3664 client_context.minimum_version = ssl.TLSVersion.TLSv1
3665 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3666 # server only TLSv1.2
3667 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3668 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3669
3670 with ThreadedEchoServer(context=server_context) as server:
3671 with client_context.wrap_socket(socket.socket(),
3672 server_hostname=hostname) as s:
3673 s.connect((HOST, server.port))
3674 self.assertEqual(s.version(), 'TLSv1.2')
3675
3676 # client 1.0 to 1.2, server 1.0 to 1.1
3677 server_context.minimum_version = ssl.TLSVersion.TLSv1
3678 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3679
3680 with ThreadedEchoServer(context=server_context) as server:
3681 with client_context.wrap_socket(socket.socket(),
3682 server_hostname=hostname) as s:
3683 s.connect((HOST, server.port))
3684 self.assertEqual(s.version(), 'TLSv1.1')
3685
3686 # client 1.0, server 1.2 (mismatch)
3687 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3688 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3689 client_context.minimum_version = ssl.TLSVersion.TLSv1
3690 client_context.maximum_version = ssl.TLSVersion.TLSv1
3691 with ThreadedEchoServer(context=server_context) as server:
3692 with client_context.wrap_socket(socket.socket(),
3693 server_hostname=hostname) as s:
3694 with self.assertRaises(ssl.SSLError) as e:
3695 s.connect((HOST, server.port))
3696 self.assertIn("alert", str(e.exception))
3697
3698
3699 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3700 "required OpenSSL 1.1.0g")
3701 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3702 def test_min_max_version_sslv3(self):
3703 client_context, server_context, hostname = testing_context()
3704 server_context.minimum_version = ssl.TLSVersion.SSLv3
3705 client_context.minimum_version = ssl.TLSVersion.SSLv3
3706 client_context.maximum_version = ssl.TLSVersion.SSLv3
3707 with ThreadedEchoServer(context=server_context) as server:
3708 with client_context.wrap_socket(socket.socket(),
3709 server_hostname=hostname) as s:
3710 s.connect((HOST, server.port))
3711 self.assertEqual(s.version(), 'SSLv3')
3712
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003713 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3714 def test_default_ecdh_curve(self):
3715 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3716 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003717 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003718 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003719 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3720 # cipher name.
3721 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003722 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3723 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3724 # our default cipher list should prefer ECDH-based ciphers
3725 # automatically.
3726 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3727 context.set_ciphers("ECCdraft:ECDH")
3728 with ThreadedEchoServer(context=context) as server:
3729 with context.wrap_socket(socket.socket()) as s:
3730 s.connect((HOST, server.port))
3731 self.assertIn("ECDH", s.cipher()[0])
3732
3733 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3734 "'tls-unique' channel binding not available")
3735 def test_tls_unique_channel_binding(self):
3736 """Test tls-unique channel binding."""
3737 if support.verbose:
3738 sys.stdout.write("\n")
3739
Christian Heimes05d9fe32018-02-27 08:55:39 +01003740 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003741
3742 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003743 chatty=True,
3744 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003745
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003746 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003747 with client_context.wrap_socket(
3748 socket.socket(),
3749 server_hostname=hostname) as s:
3750 s.connect((HOST, server.port))
3751 # get the data
3752 cb_data = s.get_channel_binding("tls-unique")
3753 if support.verbose:
3754 sys.stdout.write(
3755 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003756
Christian Heimes05d9fe32018-02-27 08:55:39 +01003757 # check if it is sane
3758 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003759 if s.version() == 'TLSv1.3':
3760 self.assertEqual(len(cb_data), 48)
3761 else:
3762 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003763
Christian Heimes05d9fe32018-02-27 08:55:39 +01003764 # and compare with the peers version
3765 s.write(b"CB tls-unique\n")
3766 peer_data_repr = s.read().strip()
3767 self.assertEqual(peer_data_repr,
3768 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769
3770 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003771 with client_context.wrap_socket(
3772 socket.socket(),
3773 server_hostname=hostname) as s:
3774 s.connect((HOST, server.port))
3775 new_cb_data = s.get_channel_binding("tls-unique")
3776 if support.verbose:
3777 sys.stdout.write(
3778 "got another channel binding data: {0!r}\n".format(
3779 new_cb_data)
3780 )
3781 # is it really unique
3782 self.assertNotEqual(cb_data, new_cb_data)
3783 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003784 if s.version() == 'TLSv1.3':
3785 self.assertEqual(len(cb_data), 48)
3786 else:
3787 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003788 s.write(b"CB tls-unique\n")
3789 peer_data_repr = s.read().strip()
3790 self.assertEqual(peer_data_repr,
3791 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003792
3793 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003794 client_context, server_context, hostname = testing_context()
3795 stats = server_params_test(client_context, server_context,
3796 chatty=True, connectionchatty=True,
3797 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003798 if support.verbose:
3799 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3800 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3801
3802 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3803 "ssl.OP_NO_COMPRESSION needed for this test")
3804 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003805 client_context, server_context, hostname = testing_context()
3806 client_context.options |= ssl.OP_NO_COMPRESSION
3807 server_context.options |= ssl.OP_NO_COMPRESSION
3808 stats = server_params_test(client_context, server_context,
3809 chatty=True, connectionchatty=True,
3810 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003811 self.assertIs(stats['compression'], None)
3812
3813 def test_dh_params(self):
3814 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003815 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003816 # test scenario needs TLS <= 1.2
3817 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003818 server_context.load_dh_params(DHFILE)
3819 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003820 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003821 stats = server_params_test(client_context, server_context,
3822 chatty=True, connectionchatty=True,
3823 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003824 cipher = stats["cipher"][0]
3825 parts = cipher.split("-")
3826 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3827 self.fail("Non-DH cipher: " + cipher[0])
3828
Christian Heimesb7b92252018-02-25 09:49:31 +01003829 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003830 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003831 def test_ecdh_curve(self):
3832 # server secp384r1, client auto
3833 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003834
Christian Heimesb7b92252018-02-25 09:49:31 +01003835 server_context.set_ecdh_curve("secp384r1")
3836 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3837 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3838 stats = server_params_test(client_context, server_context,
3839 chatty=True, connectionchatty=True,
3840 sni_name=hostname)
3841
3842 # server auto, client secp384r1
3843 client_context, server_context, hostname = testing_context()
3844 client_context.set_ecdh_curve("secp384r1")
3845 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3846 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3847 stats = server_params_test(client_context, server_context,
3848 chatty=True, connectionchatty=True,
3849 sni_name=hostname)
3850
3851 # server / client curve mismatch
3852 client_context, server_context, hostname = testing_context()
3853 client_context.set_ecdh_curve("prime256v1")
3854 server_context.set_ecdh_curve("secp384r1")
3855 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3856 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3857 try:
3858 stats = server_params_test(client_context, server_context,
3859 chatty=True, connectionchatty=True,
3860 sni_name=hostname)
3861 except ssl.SSLError:
3862 pass
3863 else:
3864 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003865 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003866 self.fail("mismatch curve did not fail")
3867
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003868 def test_selected_alpn_protocol(self):
3869 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003870 client_context, server_context, hostname = testing_context()
3871 stats = server_params_test(client_context, server_context,
3872 chatty=True, connectionchatty=True,
3873 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003874 self.assertIs(stats['client_alpn_protocol'], None)
3875
3876 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3877 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3878 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003879 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003880 server_context.set_alpn_protocols(['foo', 'bar'])
3881 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003882 chatty=True, connectionchatty=True,
3883 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003884 self.assertIs(stats['client_alpn_protocol'], None)
3885
3886 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3887 def test_alpn_protocols(self):
3888 server_protocols = ['foo', 'bar', 'milkshake']
3889 protocol_tests = [
3890 (['foo', 'bar'], 'foo'),
3891 (['bar', 'foo'], 'foo'),
3892 (['milkshake'], 'milkshake'),
3893 (['http/3.0', 'http/4.0'], None)
3894 ]
3895 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003896 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003897 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003898 client_context.set_alpn_protocols(client_protocols)
3899
3900 try:
3901 stats = server_params_test(client_context,
3902 server_context,
3903 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003904 connectionchatty=True,
3905 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003906 except ssl.SSLError as e:
3907 stats = e
3908
Christian Heimes05d9fe32018-02-27 08:55:39 +01003909 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003910 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3911 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3912 self.assertIsInstance(stats, ssl.SSLError)
3913 else:
3914 msg = "failed trying %s (s) and %s (c).\n" \
3915 "was expecting %s, but got %%s from the %%s" \
3916 % (str(server_protocols), str(client_protocols),
3917 str(expected))
3918 client_result = stats['client_alpn_protocol']
3919 self.assertEqual(client_result, expected,
3920 msg % (client_result, "client"))
3921 server_result = stats['server_alpn_protocols'][-1] \
3922 if len(stats['server_alpn_protocols']) else 'nothing'
3923 self.assertEqual(server_result, expected,
3924 msg % (server_result, "server"))
3925
3926 def test_selected_npn_protocol(self):
3927 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003928 client_context, server_context, hostname = testing_context()
3929 stats = server_params_test(client_context, server_context,
3930 chatty=True, connectionchatty=True,
3931 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003932 self.assertIs(stats['client_npn_protocol'], None)
3933
3934 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3935 def test_npn_protocols(self):
3936 server_protocols = ['http/1.1', 'spdy/2']
3937 protocol_tests = [
3938 (['http/1.1', 'spdy/2'], 'http/1.1'),
3939 (['spdy/2', 'http/1.1'], 'http/1.1'),
3940 (['spdy/2', 'test'], 'spdy/2'),
3941 (['abc', 'def'], 'abc')
3942 ]
3943 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003944 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003945 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003946 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003947 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003948 chatty=True, connectionchatty=True,
3949 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003950 msg = "failed trying %s (s) and %s (c).\n" \
3951 "was expecting %s, but got %%s from the %%s" \
3952 % (str(server_protocols), str(client_protocols),
3953 str(expected))
3954 client_result = stats['client_npn_protocol']
3955 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3956 server_result = stats['server_npn_protocols'][-1] \
3957 if len(stats['server_npn_protocols']) else 'nothing'
3958 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3959
3960 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003961 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003962 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003963 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003964 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003965 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003966 client_context.load_verify_locations(SIGNING_CA)
3967 return server_context, other_context, client_context
3968
3969 def check_common_name(self, stats, name):
3970 cert = stats['peercert']
3971 self.assertIn((('commonName', name),), cert['subject'])
3972
3973 @needs_sni
3974 def test_sni_callback(self):
3975 calls = []
3976 server_context, other_context, client_context = self.sni_contexts()
3977
Christian Heimesa170fa12017-09-15 20:27:30 +02003978 client_context.check_hostname = False
3979
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003980 def servername_cb(ssl_sock, server_name, initial_context):
3981 calls.append((server_name, initial_context))
3982 if server_name is not None:
3983 ssl_sock.context = other_context
3984 server_context.set_servername_callback(servername_cb)
3985
3986 stats = server_params_test(client_context, server_context,
3987 chatty=True,
3988 sni_name='supermessage')
3989 # The hostname was fetched properly, and the certificate was
3990 # changed for the connection.
3991 self.assertEqual(calls, [("supermessage", server_context)])
3992 # CERTFILE4 was selected
3993 self.check_common_name(stats, 'fakehostname')
3994
3995 calls = []
3996 # The callback is called with server_name=None
3997 stats = server_params_test(client_context, server_context,
3998 chatty=True,
3999 sni_name=None)
4000 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004001 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004002
4003 # Check disabling the callback
4004 calls = []
4005 server_context.set_servername_callback(None)
4006
4007 stats = server_params_test(client_context, server_context,
4008 chatty=True,
4009 sni_name='notfunny')
4010 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004011 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004012 self.assertEqual(calls, [])
4013
4014 @needs_sni
4015 def test_sni_callback_alert(self):
4016 # Returning a TLS alert is reflected to the connecting client
4017 server_context, other_context, client_context = self.sni_contexts()
4018
4019 def cb_returning_alert(ssl_sock, server_name, initial_context):
4020 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4021 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 with self.assertRaises(ssl.SSLError) as cm:
4023 stats = server_params_test(client_context, server_context,
4024 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004025 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004026 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004027
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004028 @needs_sni
4029 def test_sni_callback_raising(self):
4030 # Raising fails the connection with a TLS handshake failure alert.
4031 server_context, other_context, client_context = self.sni_contexts()
4032
4033 def cb_raising(ssl_sock, server_name, initial_context):
4034 1/0
4035 server_context.set_servername_callback(cb_raising)
4036
4037 with self.assertRaises(ssl.SSLError) as cm, \
4038 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004039 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004040 chatty=False,
4041 sni_name='supermessage')
4042 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4043 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004044
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004045 @needs_sni
4046 def test_sni_callback_wrong_return_type(self):
4047 # Returning the wrong return type terminates the TLS connection
4048 # with an internal error alert.
4049 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004050
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004051 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4052 return "foo"
4053 server_context.set_servername_callback(cb_wrong_return_type)
4054
4055 with self.assertRaises(ssl.SSLError) as cm, \
4056 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004057 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004058 chatty=False,
4059 sni_name='supermessage')
4060 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4061 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004062
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004064 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004065 client_context.set_ciphers("AES128:AES256")
4066 server_context.set_ciphers("AES256")
4067 expected_algs = [
4068 "AES256", "AES-256",
4069 # TLS 1.3 ciphers are always enabled
4070 "TLS_CHACHA20", "TLS_AES",
4071 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004072
Christian Heimesa170fa12017-09-15 20:27:30 +02004073 stats = server_params_test(client_context, server_context,
4074 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004075 ciphers = stats['server_shared_ciphers'][0]
4076 self.assertGreater(len(ciphers), 0)
4077 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004078 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004079 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004080
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004081 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004082 client_context, server_context, hostname = testing_context()
4083 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004084
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004085 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004086 s = client_context.wrap_socket(socket.socket(),
4087 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004088 s.connect((HOST, server.port))
4089 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004090
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004091 self.assertRaises(ValueError, s.read, 1024)
4092 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004093
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004094 def test_sendfile(self):
4095 TEST_DATA = b"x" * 512
4096 with open(support.TESTFN, 'wb') as f:
4097 f.write(TEST_DATA)
4098 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004099 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004100 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004101 context.load_verify_locations(SIGNING_CA)
4102 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004103 server = ThreadedEchoServer(context=context, chatty=False)
4104 with server:
4105 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004106 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004107 with open(support.TESTFN, 'rb') as file:
4108 s.sendfile(file)
4109 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004110
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004111 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004112 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004113 # TODO: sessions aren't compatible with TLSv1.3 yet
4114 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004115
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004116 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004117 stats = server_params_test(client_context, server_context,
4118 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004119 session = stats['session']
4120 self.assertTrue(session.id)
4121 self.assertGreater(session.time, 0)
4122 self.assertGreater(session.timeout, 0)
4123 self.assertTrue(session.has_ticket)
4124 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4125 self.assertGreater(session.ticket_lifetime_hint, 0)
4126 self.assertFalse(stats['session_reused'])
4127 sess_stat = server_context.session_stats()
4128 self.assertEqual(sess_stat['accept'], 1)
4129 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004130
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004131 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004132 stats = server_params_test(client_context, server_context,
4133 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004134 sess_stat = server_context.session_stats()
4135 self.assertEqual(sess_stat['accept'], 2)
4136 self.assertEqual(sess_stat['hits'], 1)
4137 self.assertTrue(stats['session_reused'])
4138 session2 = stats['session']
4139 self.assertEqual(session2.id, session.id)
4140 self.assertEqual(session2, session)
4141 self.assertIsNot(session2, session)
4142 self.assertGreaterEqual(session2.time, session.time)
4143 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004144
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004145 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004146 stats = server_params_test(client_context, server_context,
4147 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004148 self.assertFalse(stats['session_reused'])
4149 session3 = stats['session']
4150 self.assertNotEqual(session3.id, session.id)
4151 self.assertNotEqual(session3, session)
4152 sess_stat = server_context.session_stats()
4153 self.assertEqual(sess_stat['accept'], 3)
4154 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004155
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004156 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004157 stats = server_params_test(client_context, server_context,
4158 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004159 self.assertTrue(stats['session_reused'])
4160 session4 = stats['session']
4161 self.assertEqual(session4.id, session.id)
4162 self.assertEqual(session4, session)
4163 self.assertGreaterEqual(session4.time, session.time)
4164 self.assertGreaterEqual(session4.timeout, session.timeout)
4165 sess_stat = server_context.session_stats()
4166 self.assertEqual(sess_stat['accept'], 4)
4167 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004169 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004170 client_context, server_context, hostname = testing_context()
4171 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004172
Christian Heimes05d9fe32018-02-27 08:55:39 +01004173 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004174 client_context.options |= ssl.OP_NO_TLSv1_3
4175 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004176
Christian Heimesa170fa12017-09-15 20:27:30 +02004177 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004178 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004179 with client_context.wrap_socket(socket.socket(),
4180 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004181 # session is None before handshake
4182 self.assertEqual(s.session, None)
4183 self.assertEqual(s.session_reused, None)
4184 s.connect((HOST, server.port))
4185 session = s.session
4186 self.assertTrue(session)
4187 with self.assertRaises(TypeError) as e:
4188 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004189 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004190
Christian Heimesa170fa12017-09-15 20:27:30 +02004191 with client_context.wrap_socket(socket.socket(),
4192 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004193 s.connect((HOST, server.port))
4194 # cannot set session after handshake
4195 with self.assertRaises(ValueError) as e:
4196 s.session = session
4197 self.assertEqual(str(e.exception),
4198 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004199
Christian Heimesa170fa12017-09-15 20:27:30 +02004200 with client_context.wrap_socket(socket.socket(),
4201 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004202 # can set session before handshake and before the
4203 # connection was established
4204 s.session = session
4205 s.connect((HOST, server.port))
4206 self.assertEqual(s.session.id, session.id)
4207 self.assertEqual(s.session, session)
4208 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004209
Christian Heimesa170fa12017-09-15 20:27:30 +02004210 with client_context2.wrap_socket(socket.socket(),
4211 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004212 # cannot re-use session with a different SSLContext
4213 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004214 s.session = session
4215 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004216 self.assertEqual(str(e.exception),
4217 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004218
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004219
Christian Heimes9fb051f2018-09-23 08:32:31 +02004220@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4221class TestPostHandshakeAuth(unittest.TestCase):
4222 def test_pha_setter(self):
4223 protocols = [
4224 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4225 ]
4226 for protocol in protocols:
4227 ctx = ssl.SSLContext(protocol)
4228 self.assertEqual(ctx.post_handshake_auth, False)
4229
4230 ctx.post_handshake_auth = True
4231 self.assertEqual(ctx.post_handshake_auth, True)
4232
4233 ctx.verify_mode = ssl.CERT_REQUIRED
4234 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4235 self.assertEqual(ctx.post_handshake_auth, True)
4236
4237 ctx.post_handshake_auth = False
4238 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4239 self.assertEqual(ctx.post_handshake_auth, False)
4240
4241 ctx.verify_mode = ssl.CERT_OPTIONAL
4242 ctx.post_handshake_auth = True
4243 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4244 self.assertEqual(ctx.post_handshake_auth, True)
4245
4246 def test_pha_required(self):
4247 client_context, server_context, hostname = testing_context()
4248 server_context.post_handshake_auth = True
4249 server_context.verify_mode = ssl.CERT_REQUIRED
4250 client_context.post_handshake_auth = True
4251 client_context.load_cert_chain(SIGNED_CERTFILE)
4252
4253 server = ThreadedEchoServer(context=server_context, chatty=False)
4254 with server:
4255 with client_context.wrap_socket(socket.socket(),
4256 server_hostname=hostname) as s:
4257 s.connect((HOST, server.port))
4258 s.write(b'HASCERT')
4259 self.assertEqual(s.recv(1024), b'FALSE\n')
4260 s.write(b'PHA')
4261 self.assertEqual(s.recv(1024), b'OK\n')
4262 s.write(b'HASCERT')
4263 self.assertEqual(s.recv(1024), b'TRUE\n')
4264 # PHA method just returns true when cert is already available
4265 s.write(b'PHA')
4266 self.assertEqual(s.recv(1024), b'OK\n')
4267 s.write(b'GETCERT')
4268 cert_text = s.recv(4096).decode('us-ascii')
4269 self.assertIn('Python Software Foundation CA', cert_text)
4270
4271 def test_pha_required_nocert(self):
4272 client_context, server_context, hostname = testing_context()
4273 server_context.post_handshake_auth = True
4274 server_context.verify_mode = ssl.CERT_REQUIRED
4275 client_context.post_handshake_auth = True
4276
4277 server = ThreadedEchoServer(context=server_context, chatty=False)
4278 with server:
4279 with client_context.wrap_socket(socket.socket(),
4280 server_hostname=hostname) as s:
4281 s.connect((HOST, server.port))
4282 s.write(b'PHA')
4283 # receive CertificateRequest
4284 self.assertEqual(s.recv(1024), b'OK\n')
4285 # send empty Certificate + Finish
4286 s.write(b'HASCERT')
4287 # receive alert
4288 with self.assertRaisesRegex(
4289 ssl.SSLError,
4290 'tlsv13 alert certificate required'):
4291 s.recv(1024)
4292
4293 def test_pha_optional(self):
4294 if support.verbose:
4295 sys.stdout.write("\n")
4296
4297 client_context, server_context, hostname = testing_context()
4298 server_context.post_handshake_auth = True
4299 server_context.verify_mode = ssl.CERT_REQUIRED
4300 client_context.post_handshake_auth = True
4301 client_context.load_cert_chain(SIGNED_CERTFILE)
4302
4303 # check CERT_OPTIONAL
4304 server_context.verify_mode = ssl.CERT_OPTIONAL
4305 server = ThreadedEchoServer(context=server_context, chatty=False)
4306 with server:
4307 with client_context.wrap_socket(socket.socket(),
4308 server_hostname=hostname) as s:
4309 s.connect((HOST, server.port))
4310 s.write(b'HASCERT')
4311 self.assertEqual(s.recv(1024), b'FALSE\n')
4312 s.write(b'PHA')
4313 self.assertEqual(s.recv(1024), b'OK\n')
4314 s.write(b'HASCERT')
4315 self.assertEqual(s.recv(1024), b'TRUE\n')
4316
4317 def test_pha_optional_nocert(self):
4318 if support.verbose:
4319 sys.stdout.write("\n")
4320
4321 client_context, server_context, hostname = testing_context()
4322 server_context.post_handshake_auth = True
4323 server_context.verify_mode = ssl.CERT_OPTIONAL
4324 client_context.post_handshake_auth = True
4325
4326 server = ThreadedEchoServer(context=server_context, chatty=False)
4327 with server:
4328 with client_context.wrap_socket(socket.socket(),
4329 server_hostname=hostname) as s:
4330 s.connect((HOST, server.port))
4331 s.write(b'HASCERT')
4332 self.assertEqual(s.recv(1024), b'FALSE\n')
4333 s.write(b'PHA')
4334 self.assertEqual(s.recv(1024), b'OK\n')
4335 # optional doens't fail when client does not have a cert
4336 s.write(b'HASCERT')
4337 self.assertEqual(s.recv(1024), b'FALSE\n')
4338
4339 def test_pha_no_pha_client(self):
4340 client_context, server_context, hostname = testing_context()
4341 server_context.post_handshake_auth = True
4342 server_context.verify_mode = ssl.CERT_REQUIRED
4343 client_context.load_cert_chain(SIGNED_CERTFILE)
4344
4345 server = ThreadedEchoServer(context=server_context, chatty=False)
4346 with server:
4347 with client_context.wrap_socket(socket.socket(),
4348 server_hostname=hostname) as s:
4349 s.connect((HOST, server.port))
4350 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4351 s.verify_client_post_handshake()
4352 s.write(b'PHA')
4353 self.assertIn(b'extension not received', s.recv(1024))
4354
4355 def test_pha_no_pha_server(self):
4356 # server doesn't have PHA enabled, cert is requested in handshake
4357 client_context, server_context, hostname = testing_context()
4358 server_context.verify_mode = ssl.CERT_REQUIRED
4359 client_context.post_handshake_auth = True
4360 client_context.load_cert_chain(SIGNED_CERTFILE)
4361
4362 server = ThreadedEchoServer(context=server_context, chatty=False)
4363 with server:
4364 with client_context.wrap_socket(socket.socket(),
4365 server_hostname=hostname) as s:
4366 s.connect((HOST, server.port))
4367 s.write(b'HASCERT')
4368 self.assertEqual(s.recv(1024), b'TRUE\n')
4369 # PHA doesn't fail if there is already a cert
4370 s.write(b'PHA')
4371 self.assertEqual(s.recv(1024), b'OK\n')
4372 s.write(b'HASCERT')
4373 self.assertEqual(s.recv(1024), b'TRUE\n')
4374
4375 def test_pha_not_tls13(self):
4376 # TLS 1.2
4377 client_context, server_context, hostname = testing_context()
4378 server_context.verify_mode = ssl.CERT_REQUIRED
4379 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4380 client_context.post_handshake_auth = True
4381 client_context.load_cert_chain(SIGNED_CERTFILE)
4382
4383 server = ThreadedEchoServer(context=server_context, chatty=False)
4384 with server:
4385 with client_context.wrap_socket(socket.socket(),
4386 server_hostname=hostname) as s:
4387 s.connect((HOST, server.port))
4388 # PHA fails for TLS != 1.3
4389 s.write(b'PHA')
4390 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4391
4392
Thomas Woutersed03b412007-08-28 21:37:11 +00004393def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004394 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004395 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004396 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004397 'Mac': platform.mac_ver,
4398 'Windows': platform.win32_ver,
4399 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004400 for name, func in plats.items():
4401 plat = func()
4402 if plat and plat[0]:
4403 plat = '%s %r' % (name, plat)
4404 break
4405 else:
4406 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004407 print("test_ssl: testing with %r %r" %
4408 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4409 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004410 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004411 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4412 try:
4413 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4414 except AttributeError:
4415 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004416
Antoine Pitrou152efa22010-05-16 18:19:27 +00004417 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004418 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004419 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004420 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004421 BADCERT, BADKEY, EMPTYCERT]:
4422 if not os.path.exists(filename):
4423 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004424
Martin Panter3840b2a2016-03-27 01:53:46 +00004425 tests = [
4426 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004427 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes9fb051f2018-09-23 08:32:31 +02004428 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004429 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004430
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004431 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004432 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004433
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004434 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004435 try:
4436 support.run_unittest(*tests)
4437 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004438 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004439
4440if __name__ == "__main__":
4441 test_main()