blob: 55718220d88de61b49ccaa9403741957f74d04b8 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010020import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070021try:
22 import ctypes
23except ImportError:
24 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000025
Antoine Pitrou05d936d2010-10-13 11:38:36 +000026ssl = support.import_module("ssl")
27
Martin Panter3840b2a2016-03-27 01:53:46 +000028
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010029PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000030HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020031IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010032IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
33IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Victor Stinner3ef63442019-02-19 18:06:03 +010036PROTOCOL_TO_TLS_VERSION = {}
37for proto, ver in (
38 ("PROTOCOL_SSLv23", "SSLv3"),
39 ("PROTOCOL_TLSv1", "TLSv1"),
40 ("PROTOCOL_TLSv1_1", "TLSv1_1"),
41):
42 try:
43 proto = getattr(ssl, proto)
44 ver = getattr(ssl.TLSVersion, ver)
45 except AttributeError:
46 continue
47 PROTOCOL_TO_TLS_VERSION[proto] = ver
48
Christian Heimesefff7062013-11-21 03:35:02 +010049def data_file(*name):
50 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000051
Antoine Pitrou81564092010-10-08 23:06:24 +000052# The custom key and certificate files used in test_ssl are generated
53# using Lib/test/make_ssl_certs.py.
54# Other certificates are simply fetched from the Internet servers they
55# are meant to authenticate.
56
Antoine Pitrou152efa22010-05-16 18:19:27 +000057CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000058BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000059ONLYCERT = data_file("ssl_cert.pem")
60ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000061BYTES_ONLYCERT = os.fsencode(ONLYCERT)
62BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020063CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
64ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
65KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000066CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000067BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010068CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
69CAFILE_CACERT = data_file("capath", "5ed36f99.0")
70
Christian Heimesbd5c7d22018-01-20 15:16:30 +010071CERTFILE_INFO = {
72 'issuer': ((('countryName', 'XY'),),
73 (('localityName', 'Castle Anthrax'),),
74 (('organizationName', 'Python Software Foundation'),),
75 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020076 'notAfter': 'Aug 26 14:23:15 2028 GMT',
77 'notBefore': 'Aug 29 14:23:15 2018 GMT',
78 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010079 'subject': ((('countryName', 'XY'),),
80 (('localityName', 'Castle Anthrax'),),
81 (('organizationName', 'Python Software Foundation'),),
82 (('commonName', 'localhost'),)),
83 'subjectAltName': (('DNS', 'localhost'),),
84 'version': 3
85}
Antoine Pitrou152efa22010-05-16 18:19:27 +000086
Christian Heimes22587792013-11-21 23:56:13 +010087# empty CRL
88CRLFILE = data_file("revocation.crl")
89
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010090# Two keys and certs signed by the same CA (for SNI tests)
91SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020092SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010093
94SIGNED_CERTFILE_INFO = {
95 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
96 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
97 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
98 'issuer': ((('countryName', 'XY'),),
99 (('organizationName', 'Python Software Foundation CA'),),
100 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +0200101 'notAfter': 'Jul 7 14:23:16 2028 GMT',
102 'notBefore': 'Aug 29 14:23:16 2018 GMT',
103 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100104 'subject': ((('countryName', 'XY'),),
105 (('localityName', 'Castle Anthrax'),),
106 (('organizationName', 'Python Software Foundation'),),
107 (('commonName', 'localhost'),)),
108 'subjectAltName': (('DNS', 'localhost'),),
109 'version': 3
110}
111
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100112SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200113SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100114SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
115SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
116
Martin Panter3840b2a2016-03-27 01:53:46 +0000117# Same certificate as pycacert.pem, but without extra text in file
118SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200119# cert with all kinds of subject alt names
120ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100121IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100122
Martin Panter3d81d932016-01-14 09:36:00 +0000123REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000124
125EMPTYCERT = data_file("nullcert.pem")
126BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000127NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000128BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200129NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200130NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100131TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000132
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200133DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100134BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Christian Heimes358cfd42016-09-10 22:43:48 +0200136# Not defined in all versions of OpenSSL
137OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
138OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
139OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
140OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100141OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200142
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100143
Thomas Woutersed03b412007-08-28 21:37:11 +0000144def handle_error(prefix):
145 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000146 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000147 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000148
Antoine Pitroub5218772010-05-21 09:56:06 +0000149def can_clear_options():
150 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200151 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000152
153def no_sslv2_implies_sslv3_hello():
154 # 0.9.7h or higher
155 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
156
Christian Heimes2427b502013-11-23 11:24:32 +0100157def have_verify_flags():
158 # 0.9.8 or higher
159 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
160
Christian Heimesb7b92252018-02-25 09:49:31 +0100161def _have_secp_curves():
162 if not ssl.HAS_ECDH:
163 return False
164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
165 try:
166 ctx.set_ecdh_curve("secp384r1")
167 except ValueError:
168 return False
169 else:
170 return True
171
172
173HAVE_SECP_CURVES = _have_secp_curves()
174
175
Antoine Pitrouc695c952014-04-28 20:57:36 +0200176def utc_offset(): #NOTE: ignore issues like #1647654
177 # local time = utc time + utc offset
178 if time.daylight and time.localtime().tm_isdst > 0:
179 return -time.altzone # seconds
180 return -time.timezone
181
Christian Heimes9424bb42013-06-17 15:32:57 +0200182def asn1time(cert_time):
183 # Some versions of OpenSSL ignore seconds, see #18207
184 # 0.9.8.i
185 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
186 fmt = "%b %d %H:%M:%S %Y GMT"
187 dt = datetime.datetime.strptime(cert_time, fmt)
188 dt = dt.replace(second=0)
189 cert_time = dt.strftime(fmt)
190 # %d adds leading zero but ASN1_TIME_print() uses leading space
191 if cert_time[4] == "0":
192 cert_time = cert_time[:4] + " " + cert_time[5:]
193
194 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000195
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100196needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
197
Antoine Pitrou23df4832010-08-04 17:14:06 +0000198
Christian Heimesd0486372016-09-10 23:23:33 +0200199def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
200 cert_reqs=ssl.CERT_NONE, ca_certs=None,
201 ciphers=None, certfile=None, keyfile=None,
202 **kwargs):
203 context = ssl.SSLContext(ssl_version)
204 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200205 if cert_reqs == ssl.CERT_NONE:
206 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200207 context.verify_mode = cert_reqs
208 if ca_certs is not None:
209 context.load_verify_locations(ca_certs)
210 if certfile is not None or keyfile is not None:
211 context.load_cert_chain(certfile, keyfile)
212 if ciphers is not None:
213 context.set_ciphers(ciphers)
214 return context.wrap_socket(sock, **kwargs)
215
Christian Heimesa170fa12017-09-15 20:27:30 +0200216
217def testing_context(server_cert=SIGNED_CERTFILE):
218 """Create context
219
220 client_context, server_context, hostname = testing_context()
221 """
222 if server_cert == SIGNED_CERTFILE:
223 hostname = SIGNED_CERTFILE_HOSTNAME
224 elif server_cert == SIGNED_CERTFILE2:
225 hostname = SIGNED_CERTFILE2_HOSTNAME
226 else:
227 raise ValueError(server_cert)
228
229 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
230 client_context.load_verify_locations(SIGNING_CA)
231
232 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
233 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200234 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200235
236 return client_context, server_context, hostname
237
238
Antoine Pitrou152efa22010-05-16 18:19:27 +0000239class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000240
Antoine Pitrou480a1242010-04-28 21:37:09 +0000241 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000242 ssl.CERT_NONE
243 ssl.CERT_OPTIONAL
244 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100245 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100246 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100247 if ssl.HAS_ECDH:
248 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100249 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
250 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000251 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100252 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700253 ssl.OP_NO_SSLv2
254 ssl.OP_NO_SSLv3
255 ssl.OP_NO_TLSv1
256 ssl.OP_NO_TLSv1_3
257 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
258 ssl.OP_NO_TLSv1_1
259 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200260 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000261
Christian Heimes9d50ab52018-02-27 10:17:30 +0100262 def test_private_init(self):
263 with self.assertRaisesRegex(TypeError, "public constructor"):
264 with socket.socket() as s:
265 ssl.SSLSocket(s)
266
Antoine Pitrou172f0252014-04-18 20:33:08 +0200267 def test_str_for_enums(self):
268 # Make sure that the PROTOCOL_* constants have enum-like string
269 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200270 proto = ssl.PROTOCOL_TLS
271 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200272 ctx = ssl.SSLContext(proto)
273 self.assertIs(ctx.protocol, proto)
274
Antoine Pitrou480a1242010-04-28 21:37:09 +0000275 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000276 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000277 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 sys.stdout.write("\n RAND_status is %d (%s)\n"
279 % (v, (v and "sufficient randomness") or
280 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200281
282 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
283 self.assertEqual(len(data), 16)
284 self.assertEqual(is_cryptographic, v == 1)
285 if v:
286 data = ssl.RAND_bytes(16)
287 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200288 else:
289 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200290
Victor Stinner1e81a392013-12-19 16:47:04 +0100291 # negative num is invalid
292 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
293 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
294
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100295 if hasattr(ssl, 'RAND_egd'):
296 self.assertRaises(TypeError, ssl.RAND_egd, 1)
297 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000298 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200299 ssl.RAND_add(b"this is a random bytes object", 75.0)
300 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000301
Christian Heimesf77b4b22013-08-21 13:26:05 +0200302 @unittest.skipUnless(os.name == 'posix', 'requires posix')
303 def test_random_fork(self):
304 status = ssl.RAND_status()
305 if not status:
306 self.fail("OpenSSL's PRNG has insufficient randomness")
307
308 rfd, wfd = os.pipe()
309 pid = os.fork()
310 if pid == 0:
311 try:
312 os.close(rfd)
313 child_random = ssl.RAND_pseudo_bytes(16)[0]
314 self.assertEqual(len(child_random), 16)
315 os.write(wfd, child_random)
316 os.close(wfd)
317 except BaseException:
318 os._exit(1)
319 else:
320 os._exit(0)
321 else:
322 os.close(wfd)
323 self.addCleanup(os.close, rfd)
324 _, status = os.waitpid(pid, 0)
325 self.assertEqual(status, 0)
326
327 child_random = os.read(rfd, 16)
328 self.assertEqual(len(child_random), 16)
329 parent_random = ssl.RAND_pseudo_bytes(16)[0]
330 self.assertEqual(len(parent_random), 16)
331
332 self.assertNotEqual(child_random, parent_random)
333
Christian Heimese6dac002018-08-30 07:25:49 +0200334 maxDiff = None
335
Antoine Pitrou480a1242010-04-28 21:37:09 +0000336 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000337 # note that this uses an 'unofficial' function in _ssl.c,
338 # provided solely for this test, to exercise the certificate
339 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100340 self.assertEqual(
341 ssl._ssl._test_decode_cert(CERTFILE),
342 CERTFILE_INFO
343 )
344 self.assertEqual(
345 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
346 SIGNED_CERTFILE_INFO
347 )
348
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200349 # Issue #13034: the subjectAltName in some certificates
350 # (notably projects.developer.nokia.com:443) wasn't parsed
351 p = ssl._ssl._test_decode_cert(NOKIACERT)
352 if support.verbose:
353 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
354 self.assertEqual(p['subjectAltName'],
355 (('DNS', 'projects.developer.nokia.com'),
356 ('DNS', 'projects.forum.nokia.com'))
357 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100358 # extra OCSP and AIA fields
359 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
360 self.assertEqual(p['caIssuers'],
361 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
362 self.assertEqual(p['crlDistributionPoints'],
363 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000364
Christian Heimesa37f5242019-01-15 23:47:42 +0100365 def test_parse_cert_CVE_2019_5010(self):
366 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
367 if support.verbose:
368 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
369 self.assertEqual(
370 p,
371 {
372 'issuer': (
373 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
374 'notAfter': 'Jun 14 18:00:58 2028 GMT',
375 'notBefore': 'Jun 18 18:00:58 2018 GMT',
376 'serialNumber': '02',
377 'subject': ((('countryName', 'UK'),),
378 (('commonName',
379 'codenomicon-vm-2.test.lal.cisco.com'),)),
380 'subjectAltName': (
381 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
382 'version': 3
383 }
384 )
385
Christian Heimes824f7f32013-08-17 00:54:47 +0200386 def test_parse_cert_CVE_2013_4238(self):
387 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
388 if support.verbose:
389 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
390 subject = ((('countryName', 'US'),),
391 (('stateOrProvinceName', 'Oregon'),),
392 (('localityName', 'Beaverton'),),
393 (('organizationName', 'Python Software Foundation'),),
394 (('organizationalUnitName', 'Python Core Development'),),
395 (('commonName', 'null.python.org\x00example.org'),),
396 (('emailAddress', 'python-dev@python.org'),))
397 self.assertEqual(p['subject'], subject)
398 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200399 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
400 san = (('DNS', 'altnull.python.org\x00example.com'),
401 ('email', 'null@python.org\x00user@example.org'),
402 ('URI', 'http://null.python.org\x00http://example.org'),
403 ('IP Address', '192.0.2.1'),
404 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
405 else:
406 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
407 san = (('DNS', 'altnull.python.org\x00example.com'),
408 ('email', 'null@python.org\x00user@example.org'),
409 ('URI', 'http://null.python.org\x00http://example.org'),
410 ('IP Address', '192.0.2.1'),
411 ('IP Address', '<invalid>'))
412
413 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200414
Christian Heimes1c03abd2016-09-06 23:25:35 +0200415 def test_parse_all_sans(self):
416 p = ssl._ssl._test_decode_cert(ALLSANFILE)
417 self.assertEqual(p['subjectAltName'],
418 (
419 ('DNS', 'allsans'),
420 ('othername', '<unsupported>'),
421 ('othername', '<unsupported>'),
422 ('email', 'user@example.org'),
423 ('DNS', 'www.example.org'),
424 ('DirName',
425 ((('countryName', 'XY'),),
426 (('localityName', 'Castle Anthrax'),),
427 (('organizationName', 'Python Software Foundation'),),
428 (('commonName', 'dirname example'),))),
429 ('URI', 'https://www.python.org/'),
430 ('IP Address', '127.0.0.1'),
431 ('IP Address', '0:0:0:0:0:0:0:1\n'),
432 ('Registered ID', '1.2.3.4.5')
433 )
434 )
435
Antoine Pitrou480a1242010-04-28 21:37:09 +0000436 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000437 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000438 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000439 d1 = ssl.PEM_cert_to_DER_cert(pem)
440 p2 = ssl.DER_cert_to_PEM_cert(d1)
441 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000442 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000443 if not p2.startswith(ssl.PEM_HEADER + '\n'):
444 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
445 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
446 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000447
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000448 def test_openssl_version(self):
449 n = ssl.OPENSSL_VERSION_NUMBER
450 t = ssl.OPENSSL_VERSION_INFO
451 s = ssl.OPENSSL_VERSION
452 self.assertIsInstance(n, int)
453 self.assertIsInstance(t, tuple)
454 self.assertIsInstance(s, str)
455 # Some sanity checks follow
456 # >= 0.9
457 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400458 # < 3.0
459 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000460 major, minor, fix, patch, status = t
461 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400462 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000463 self.assertGreaterEqual(minor, 0)
464 self.assertLess(minor, 256)
465 self.assertGreaterEqual(fix, 0)
466 self.assertLess(fix, 256)
467 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100468 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000469 self.assertGreaterEqual(status, 0)
470 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400471 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200472 if IS_LIBRESSL:
473 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100474 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400475 else:
476 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100477 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000478
Antoine Pitrou9d543662010-04-23 23:10:32 +0000479 @support.cpython_only
480 def test_refcycle(self):
481 # Issue #7943: an SSL object doesn't create reference cycles with
482 # itself.
483 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200484 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000485 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100486 with support.check_warnings(("", ResourceWarning)):
487 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100488 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000489
Antoine Pitroua468adc2010-09-14 14:43:44 +0000490 def test_wrapped_unconnected(self):
491 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200492 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000493 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200494 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100495 self.assertRaises(OSError, ss.recv, 1)
496 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
497 self.assertRaises(OSError, ss.recvfrom, 1)
498 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
499 self.assertRaises(OSError, ss.send, b'x')
500 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200501 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100502 self.assertRaises(NotImplementedError, ss.sendmsg,
503 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200504 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
505 self.assertRaises(NotImplementedError, ss.recvmsg_into,
506 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000507
Antoine Pitrou40f08742010-04-24 22:04:40 +0000508 def test_timeout(self):
509 # Issue #8524: when creating an SSL socket, the timeout of the
510 # original socket should be retained.
511 for timeout in (None, 0.0, 5.0):
512 s = socket.socket(socket.AF_INET)
513 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200514 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100515 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000516
Christian Heimesd0486372016-09-10 23:23:33 +0200517 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000518 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000519 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000520 "certfile must be specified",
521 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000522 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000523 "certfile must be specified for server-side operations",
524 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000525 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000526 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200527 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100528 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
529 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200530 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200531 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000532 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000533 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000534 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200535 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000536 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000537 ssl.wrap_socket(sock,
538 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000539 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200540 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000541 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000542 ssl.wrap_socket(sock,
543 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000544 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000545
Martin Panter3464ea22016-02-01 21:58:11 +0000546 def bad_cert_test(self, certfile):
547 """Check that trying to use the given client certificate fails"""
548 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
549 certfile)
550 sock = socket.socket()
551 self.addCleanup(sock.close)
552 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200553 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200554 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000555
556 def test_empty_cert(self):
557 """Wrapping with an empty cert file"""
558 self.bad_cert_test("nullcert.pem")
559
560 def test_malformed_cert(self):
561 """Wrapping with a badly formatted certificate (syntax error)"""
562 self.bad_cert_test("badcert.pem")
563
564 def test_malformed_key(self):
565 """Wrapping with a badly formatted key (syntax error)"""
566 self.bad_cert_test("badkey.pem")
567
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000568 def test_match_hostname(self):
569 def ok(cert, hostname):
570 ssl.match_hostname(cert, hostname)
571 def fail(cert, hostname):
572 self.assertRaises(ssl.CertificateError,
573 ssl.match_hostname, cert, hostname)
574
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100575 # -- Hostname matching --
576
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000577 cert = {'subject': ((('commonName', 'example.com'),),)}
578 ok(cert, 'example.com')
579 ok(cert, 'ExAmple.cOm')
580 fail(cert, 'www.example.com')
581 fail(cert, '.example.com')
582 fail(cert, 'example.org')
583 fail(cert, 'exampleXcom')
584
585 cert = {'subject': ((('commonName', '*.a.com'),),)}
586 ok(cert, 'foo.a.com')
587 fail(cert, 'bar.foo.a.com')
588 fail(cert, 'a.com')
589 fail(cert, 'Xa.com')
590 fail(cert, '.a.com')
591
Mandeep Singhede2ac92017-11-27 04:01:27 +0530592 # only match wildcards when they are the only thing
593 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000594 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530595 fail(cert, 'foo.com')
596 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000597 fail(cert, 'bar.com')
598 fail(cert, 'foo.a.com')
599 fail(cert, 'bar.foo.com')
600
Christian Heimes824f7f32013-08-17 00:54:47 +0200601 # NULL bytes are bad, CVE-2013-4073
602 cert = {'subject': ((('commonName',
603 'null.python.org\x00example.org'),),)}
604 ok(cert, 'null.python.org\x00example.org') # or raise an error?
605 fail(cert, 'example.org')
606 fail(cert, 'null.python.org')
607
Georg Brandl72c98d32013-10-27 07:16:53 +0100608 # error cases with wildcards
609 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
610 fail(cert, 'bar.foo.a.com')
611 fail(cert, 'a.com')
612 fail(cert, 'Xa.com')
613 fail(cert, '.a.com')
614
615 cert = {'subject': ((('commonName', 'a.*.com'),),)}
616 fail(cert, 'a.foo.com')
617 fail(cert, 'a..com')
618 fail(cert, 'a.com')
619
620 # wildcard doesn't match IDNA prefix 'xn--'
621 idna = 'püthon.python.org'.encode("idna").decode("ascii")
622 cert = {'subject': ((('commonName', idna),),)}
623 ok(cert, idna)
624 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
625 fail(cert, idna)
626 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
627 fail(cert, idna)
628
629 # wildcard in first fragment and IDNA A-labels in sequent fragments
630 # are supported.
631 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
632 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530633 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
634 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100635 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
636 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
637
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000638 # Slightly fake real-world example
639 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
640 'subject': ((('commonName', 'linuxfrz.org'),),),
641 'subjectAltName': (('DNS', 'linuxfr.org'),
642 ('DNS', 'linuxfr.com'),
643 ('othername', '<unsupported>'))}
644 ok(cert, 'linuxfr.org')
645 ok(cert, 'linuxfr.com')
646 # Not a "DNS" entry
647 fail(cert, '<unsupported>')
648 # When there is a subjectAltName, commonName isn't used
649 fail(cert, 'linuxfrz.org')
650
651 # A pristine real-world example
652 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
653 'subject': ((('countryName', 'US'),),
654 (('stateOrProvinceName', 'California'),),
655 (('localityName', 'Mountain View'),),
656 (('organizationName', 'Google Inc'),),
657 (('commonName', 'mail.google.com'),))}
658 ok(cert, 'mail.google.com')
659 fail(cert, 'gmail.com')
660 # Only commonName is considered
661 fail(cert, 'California')
662
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100663 # -- IPv4 matching --
664 cert = {'subject': ((('commonName', 'example.com'),),),
665 'subjectAltName': (('DNS', 'example.com'),
666 ('IP Address', '10.11.12.13'),
667 ('IP Address', '14.15.16.17'))}
668 ok(cert, '10.11.12.13')
669 ok(cert, '14.15.16.17')
670 fail(cert, '14.15.16.18')
671 fail(cert, 'example.net')
672
673 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100674 if hasattr(socket, 'AF_INET6'):
675 cert = {'subject': ((('commonName', 'example.com'),),),
676 'subjectAltName': (
677 ('DNS', 'example.com'),
678 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
679 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
680 ok(cert, '2001::cafe')
681 ok(cert, '2003::baba')
682 fail(cert, '2003::bebe')
683 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100684
685 # -- Miscellaneous --
686
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000687 # Neither commonName nor subjectAltName
688 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
689 'subject': ((('countryName', 'US'),),
690 (('stateOrProvinceName', 'California'),),
691 (('localityName', 'Mountain View'),),
692 (('organizationName', 'Google Inc'),))}
693 fail(cert, 'mail.google.com')
694
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200695 # No DNS entry in subjectAltName but a commonName
696 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
697 'subject': ((('countryName', 'US'),),
698 (('stateOrProvinceName', 'California'),),
699 (('localityName', 'Mountain View'),),
700 (('commonName', 'mail.google.com'),)),
701 'subjectAltName': (('othername', 'blabla'), )}
702 ok(cert, 'mail.google.com')
703
704 # No DNS entry subjectAltName and no commonName
705 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
706 'subject': ((('countryName', 'US'),),
707 (('stateOrProvinceName', 'California'),),
708 (('localityName', 'Mountain View'),),
709 (('organizationName', 'Google Inc'),)),
710 'subjectAltName': (('othername', 'blabla'),)}
711 fail(cert, 'google.com')
712
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000713 # Empty cert / no cert
714 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
715 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
716
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200717 # Issue #17980: avoid denials of service by refusing more than one
718 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100719 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
720 with self.assertRaisesRegex(
721 ssl.CertificateError,
722 "partial wildcards in leftmost label are not supported"):
723 ssl.match_hostname(cert, 'axxb.example.com')
724
725 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
726 with self.assertRaisesRegex(
727 ssl.CertificateError,
728 "wildcard can only be present in the leftmost label"):
729 ssl.match_hostname(cert, 'www.sub.example.com')
730
731 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
732 with self.assertRaisesRegex(
733 ssl.CertificateError,
734 "too many wildcards"):
735 ssl.match_hostname(cert, 'axxbxxc.example.com')
736
737 cert = {'subject': ((('commonName', '*'),),)}
738 with self.assertRaisesRegex(
739 ssl.CertificateError,
740 "sole wildcard without additional labels are not support"):
741 ssl.match_hostname(cert, 'host')
742
743 cert = {'subject': ((('commonName', '*.com'),),)}
744 with self.assertRaisesRegex(
745 ssl.CertificateError,
746 r"hostname 'com' doesn't match '\*.com'"):
747 ssl.match_hostname(cert, 'com')
748
749 # extra checks for _inet_paton()
750 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
751 with self.assertRaises(ValueError):
752 ssl._inet_paton(invalid)
753 for ipaddr in ['127.0.0.1', '192.168.0.1']:
754 self.assertTrue(ssl._inet_paton(ipaddr))
755 if hasattr(socket, 'AF_INET6'):
756 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
757 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200758
Antoine Pitroud5323212010-10-22 18:19:07 +0000759 def test_server_side(self):
760 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200761 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000762 with socket.socket() as sock:
763 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
764 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000765
Antoine Pitroud6494802011-07-21 01:11:30 +0200766 def test_unknown_channel_binding(self):
767 # should raise ValueError for unknown type
768 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200769 s.bind(('127.0.0.1', 0))
770 s.listen()
771 c = socket.socket(socket.AF_INET)
772 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200773 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100774 with self.assertRaises(ValueError):
775 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200776 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200777
778 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
779 "'tls-unique' channel binding not available")
780 def test_tls_unique_channel_binding(self):
781 # unconnected should return None for known type
782 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200783 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100784 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200785 # the same for server-side
786 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200787 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100788 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200789
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600790 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200791 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600792 r = repr(ss)
793 with self.assertWarns(ResourceWarning) as cm:
794 ss = None
795 support.gc_collect()
796 self.assertIn(r, str(cm.warning.args[0]))
797
Christian Heimes6d7ad132013-06-09 18:02:55 +0200798 def test_get_default_verify_paths(self):
799 paths = ssl.get_default_verify_paths()
800 self.assertEqual(len(paths), 6)
801 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
802
803 with support.EnvironmentVarGuard() as env:
804 env["SSL_CERT_DIR"] = CAPATH
805 env["SSL_CERT_FILE"] = CERTFILE
806 paths = ssl.get_default_verify_paths()
807 self.assertEqual(paths.cafile, CERTFILE)
808 self.assertEqual(paths.capath, CAPATH)
809
Christian Heimes44109d72013-11-22 01:51:30 +0100810 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
811 def test_enum_certificates(self):
812 self.assertTrue(ssl.enum_certificates("CA"))
813 self.assertTrue(ssl.enum_certificates("ROOT"))
814
815 self.assertRaises(TypeError, ssl.enum_certificates)
816 self.assertRaises(WindowsError, ssl.enum_certificates, "")
817
Christian Heimesc2d65e12013-11-22 16:13:55 +0100818 trust_oids = set()
819 for storename in ("CA", "ROOT"):
820 store = ssl.enum_certificates(storename)
821 self.assertIsInstance(store, list)
822 for element in store:
823 self.assertIsInstance(element, tuple)
824 self.assertEqual(len(element), 3)
825 cert, enc, trust = element
826 self.assertIsInstance(cert, bytes)
827 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
828 self.assertIsInstance(trust, (set, bool))
829 if isinstance(trust, set):
830 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100831
832 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100833 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200834
Christian Heimes46bebee2013-06-09 19:03:31 +0200835 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100836 def test_enum_crls(self):
837 self.assertTrue(ssl.enum_crls("CA"))
838 self.assertRaises(TypeError, ssl.enum_crls)
839 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200840
Christian Heimes44109d72013-11-22 01:51:30 +0100841 crls = ssl.enum_crls("CA")
842 self.assertIsInstance(crls, list)
843 for element in crls:
844 self.assertIsInstance(element, tuple)
845 self.assertEqual(len(element), 2)
846 self.assertIsInstance(element[0], bytes)
847 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200848
Christian Heimes46bebee2013-06-09 19:03:31 +0200849
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100850 def test_asn1object(self):
851 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
852 '1.3.6.1.5.5.7.3.1')
853
854 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
855 self.assertEqual(val, expected)
856 self.assertEqual(val.nid, 129)
857 self.assertEqual(val.shortname, 'serverAuth')
858 self.assertEqual(val.longname, 'TLS Web Server Authentication')
859 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
860 self.assertIsInstance(val, ssl._ASN1Object)
861 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
862
863 val = ssl._ASN1Object.fromnid(129)
864 self.assertEqual(val, expected)
865 self.assertIsInstance(val, ssl._ASN1Object)
866 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100867 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
868 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100869 for i in range(1000):
870 try:
871 obj = ssl._ASN1Object.fromnid(i)
872 except ValueError:
873 pass
874 else:
875 self.assertIsInstance(obj.nid, int)
876 self.assertIsInstance(obj.shortname, str)
877 self.assertIsInstance(obj.longname, str)
878 self.assertIsInstance(obj.oid, (str, type(None)))
879
880 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
881 self.assertEqual(val, expected)
882 self.assertIsInstance(val, ssl._ASN1Object)
883 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
884 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
885 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100886 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
887 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100888
Christian Heimes72d28502013-11-23 13:56:58 +0100889 def test_purpose_enum(self):
890 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
891 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
892 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
893 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
894 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
895 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
896 '1.3.6.1.5.5.7.3.1')
897
898 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
899 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
900 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
901 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
902 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
903 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
904 '1.3.6.1.5.5.7.3.2')
905
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100906 def test_unsupported_dtls(self):
907 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
908 self.addCleanup(s.close)
909 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200910 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100911 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200912 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100913 with self.assertRaises(NotImplementedError) as cx:
914 ctx.wrap_socket(s)
915 self.assertEqual(str(cx.exception), "only stream sockets are supported")
916
Antoine Pitrouc695c952014-04-28 20:57:36 +0200917 def cert_time_ok(self, timestring, timestamp):
918 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
919
920 def cert_time_fail(self, timestring):
921 with self.assertRaises(ValueError):
922 ssl.cert_time_to_seconds(timestring)
923
924 @unittest.skipUnless(utc_offset(),
925 'local time needs to be different from UTC')
926 def test_cert_time_to_seconds_timezone(self):
927 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
928 # results if local timezone is not UTC
929 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
930 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
931
932 def test_cert_time_to_seconds(self):
933 timestring = "Jan 5 09:34:43 2018 GMT"
934 ts = 1515144883.0
935 self.cert_time_ok(timestring, ts)
936 # accept keyword parameter, assert its name
937 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
938 # accept both %e and %d (space or zero generated by strftime)
939 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
940 # case-insensitive
941 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
942 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
943 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
944 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
945 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
946 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
947 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
948 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
949
950 newyear_ts = 1230768000.0
951 # leap seconds
952 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
953 # same timestamp
954 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
955
956 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
957 # allow 60th second (even if it is not a leap second)
958 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
959 # allow 2nd leap second for compatibility with time.strptime()
960 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
961 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
962
Mike53f7a7c2017-12-14 14:04:53 +0300963 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200964 # 99991231235959Z (rfc 5280)
965 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
966
967 @support.run_with_locale('LC_ALL', '')
968 def test_cert_time_to_seconds_locale(self):
969 # `cert_time_to_seconds()` should be locale independent
970
971 def local_february_name():
972 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
973
974 if local_february_name().lower() == 'feb':
975 self.skipTest("locale-specific month name needs to be "
976 "different from C locale")
977
978 # locale-independent
979 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
980 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
981
Martin Panter3840b2a2016-03-27 01:53:46 +0000982 def test_connect_ex_error(self):
983 server = socket.socket(socket.AF_INET)
984 self.addCleanup(server.close)
985 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200986 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000987 cert_reqs=ssl.CERT_REQUIRED)
988 self.addCleanup(s.close)
989 rc = s.connect_ex((HOST, port))
990 # Issue #19919: Windows machines or VMs hosted on Windows
991 # machines sometimes return EWOULDBLOCK.
992 errors = (
993 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
994 errno.EWOULDBLOCK,
995 )
996 self.assertIn(rc, errors)
997
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100998
Antoine Pitrou152efa22010-05-16 18:19:27 +0000999class ContextTests(unittest.TestCase):
1000
1001 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001002 for protocol in PROTOCOLS:
1003 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +02001004 ctx = ssl.SSLContext()
1005 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001006 self.assertRaises(ValueError, ssl.SSLContext, -1)
1007 self.assertRaises(ValueError, ssl.SSLContext, 42)
1008
1009 def test_protocol(self):
1010 for proto in PROTOCOLS:
1011 ctx = ssl.SSLContext(proto)
1012 self.assertEqual(ctx.protocol, proto)
1013
1014 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001016 ctx.set_ciphers("ALL")
1017 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001018 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001019 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001020
Christian Heimes892d66e2018-01-29 14:10:18 +01001021 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1022 "Test applies only to Python default ciphers")
1023 def test_python_ciphers(self):
1024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1025 ciphers = ctx.get_ciphers()
1026 for suite in ciphers:
1027 name = suite['name']
1028 self.assertNotIn("PSK", name)
1029 self.assertNotIn("SRP", name)
1030 self.assertNotIn("MD5", name)
1031 self.assertNotIn("RC4", name)
1032 self.assertNotIn("3DES", name)
1033
Christian Heimes25bfcd52016-09-06 00:04:45 +02001034 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1035 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001036 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001037 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001038 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001039 self.assertIn('AES256-GCM-SHA384', names)
1040 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001041
Antoine Pitroub5218772010-05-21 09:56:06 +00001042 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001044 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001045 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001046 # SSLContext also enables these by default
1047 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001048 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1049 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001050 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001051 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001052 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001053 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001054 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1055 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001056 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001057 # Ubuntu has OP_NO_SSLv3 forced on by default
1058 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001059 else:
1060 with self.assertRaises(ValueError):
1061 ctx.options = 0
1062
Christian Heimesa170fa12017-09-15 20:27:30 +02001063 def test_verify_mode_protocol(self):
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001065 # Default value
1066 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1067 ctx.verify_mode = ssl.CERT_OPTIONAL
1068 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1069 ctx.verify_mode = ssl.CERT_REQUIRED
1070 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1071 ctx.verify_mode = ssl.CERT_NONE
1072 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1073 with self.assertRaises(TypeError):
1074 ctx.verify_mode = None
1075 with self.assertRaises(ValueError):
1076 ctx.verify_mode = 42
1077
Christian Heimesa170fa12017-09-15 20:27:30 +02001078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1079 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1080 self.assertFalse(ctx.check_hostname)
1081
1082 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1083 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1084 self.assertTrue(ctx.check_hostname)
1085
Christian Heimes61d478c2018-01-27 15:51:38 +01001086 def test_hostname_checks_common_name(self):
1087 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1088 self.assertTrue(ctx.hostname_checks_common_name)
1089 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1090 ctx.hostname_checks_common_name = True
1091 self.assertTrue(ctx.hostname_checks_common_name)
1092 ctx.hostname_checks_common_name = False
1093 self.assertFalse(ctx.hostname_checks_common_name)
1094 ctx.hostname_checks_common_name = True
1095 self.assertTrue(ctx.hostname_checks_common_name)
1096 else:
1097 with self.assertRaises(AttributeError):
1098 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001099
Christian Heimes698dde12018-02-27 11:54:43 +01001100 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1101 "required OpenSSL 1.1.0g")
1102 def test_min_max_version(self):
1103 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001104 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1105 # Fedora override the setting to TLS 1.0.
1106 self.assertIn(
1107 ctx.minimum_version,
Victor Stinner3ef63442019-02-19 18:06:03 +01001108 {ssl.TLSVersion.MINIMUM_SUPPORTED,
1109 # Fedora 29 uses TLS 1.0 by default
1110 ssl.TLSVersion.TLSv1,
1111 # RHEL 8 uses TLS 1.2 by default
1112 ssl.TLSVersion.TLSv1_2}
Christian Heimes698dde12018-02-27 11:54:43 +01001113 )
1114 self.assertEqual(
1115 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1116 )
1117
1118 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1119 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1120 self.assertEqual(
1121 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1122 )
1123 self.assertEqual(
1124 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1125 )
1126
1127 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1128 ctx.maximum_version = ssl.TLSVersion.TLSv1
1129 self.assertEqual(
1130 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1131 )
1132 self.assertEqual(
1133 ctx.maximum_version, ssl.TLSVersion.TLSv1
1134 )
1135
1136 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1137 self.assertEqual(
1138 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1139 )
1140
1141 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1142 self.assertIn(
1143 ctx.maximum_version,
1144 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1145 )
1146
1147 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1148 self.assertIn(
1149 ctx.minimum_version,
1150 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1151 )
1152
1153 with self.assertRaises(ValueError):
1154 ctx.minimum_version = 42
1155
1156 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1157
1158 self.assertEqual(
1159 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1160 )
1161 self.assertEqual(
1162 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1163 )
1164 with self.assertRaises(ValueError):
1165 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1166 with self.assertRaises(ValueError):
1167 ctx.maximum_version = ssl.TLSVersion.TLSv1
1168
1169
Christian Heimes2427b502013-11-23 11:24:32 +01001170 @unittest.skipUnless(have_verify_flags(),
1171 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001172 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001174 # default value
1175 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1176 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001177 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1178 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1179 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1180 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1181 ctx.verify_flags = ssl.VERIFY_DEFAULT
1182 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1183 # supports any value
1184 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1185 self.assertEqual(ctx.verify_flags,
1186 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1187 with self.assertRaises(TypeError):
1188 ctx.verify_flags = None
1189
Antoine Pitrou152efa22010-05-16 18:19:27 +00001190 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001192 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001193 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001194 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1195 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001196 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001197 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001198 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001199 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001200 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001201 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001202 ctx.load_cert_chain(EMPTYCERT)
1203 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001204 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001205 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1206 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1207 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001208 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001209 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001210 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001211 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001212 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001213 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1214 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001215 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001216 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001217 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001218 # Password protected key and cert
1219 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1220 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1221 ctx.load_cert_chain(CERTFILE_PROTECTED,
1222 password=bytearray(KEY_PASSWORD.encode()))
1223 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1224 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1225 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1226 bytearray(KEY_PASSWORD.encode()))
1227 with self.assertRaisesRegex(TypeError, "should be a string"):
1228 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1229 with self.assertRaises(ssl.SSLError):
1230 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1231 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1232 # openssl has a fixed limit on the password buffer.
1233 # PEM_BUFSIZE is generally set to 1kb.
1234 # Return a string larger than this.
1235 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1236 # Password callback
1237 def getpass_unicode():
1238 return KEY_PASSWORD
1239 def getpass_bytes():
1240 return KEY_PASSWORD.encode()
1241 def getpass_bytearray():
1242 return bytearray(KEY_PASSWORD.encode())
1243 def getpass_badpass():
1244 return "badpass"
1245 def getpass_huge():
1246 return b'a' * (1024 * 1024)
1247 def getpass_bad_type():
1248 return 9
1249 def getpass_exception():
1250 raise Exception('getpass error')
1251 class GetPassCallable:
1252 def __call__(self):
1253 return KEY_PASSWORD
1254 def getpass(self):
1255 return KEY_PASSWORD
1256 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1257 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1258 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1259 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1260 ctx.load_cert_chain(CERTFILE_PROTECTED,
1261 password=GetPassCallable().getpass)
1262 with self.assertRaises(ssl.SSLError):
1263 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1264 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1265 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1266 with self.assertRaisesRegex(TypeError, "must return a string"):
1267 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1268 with self.assertRaisesRegex(Exception, "getpass error"):
1269 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1270 # Make sure the password function isn't called if it isn't needed
1271 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001272
1273 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001275 ctx.load_verify_locations(CERTFILE)
1276 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1277 ctx.load_verify_locations(BYTES_CERTFILE)
1278 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1279 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001280 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001281 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001282 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001283 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001284 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001285 ctx.load_verify_locations(BADCERT)
1286 ctx.load_verify_locations(CERTFILE, CAPATH)
1287 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1288
Victor Stinner80f75e62011-01-29 11:31:20 +00001289 # Issue #10989: crash if the second argument type is invalid
1290 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1291
Christian Heimesefff7062013-11-21 03:35:02 +01001292 def test_load_verify_cadata(self):
1293 # test cadata
1294 with open(CAFILE_CACERT) as f:
1295 cacert_pem = f.read()
1296 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1297 with open(CAFILE_NEURONIO) as f:
1298 neuronio_pem = f.read()
1299 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1300
1301 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001302 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001303 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1304 ctx.load_verify_locations(cadata=cacert_pem)
1305 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1306 ctx.load_verify_locations(cadata=neuronio_pem)
1307 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1308 # cert already in hash table
1309 ctx.load_verify_locations(cadata=neuronio_pem)
1310 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1311
1312 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001313 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001314 combined = "\n".join((cacert_pem, neuronio_pem))
1315 ctx.load_verify_locations(cadata=combined)
1316 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1317
1318 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001319 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001320 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1321 neuronio_pem, "tail"]
1322 ctx.load_verify_locations(cadata="\n".join(combined))
1323 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1324
1325 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001326 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001327 ctx.load_verify_locations(cadata=cacert_der)
1328 ctx.load_verify_locations(cadata=neuronio_der)
1329 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1330 # cert already in hash table
1331 ctx.load_verify_locations(cadata=cacert_der)
1332 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1333
1334 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001335 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001336 combined = b"".join((cacert_der, neuronio_der))
1337 ctx.load_verify_locations(cadata=combined)
1338 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1339
1340 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001341 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001342 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1343
1344 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1345 ctx.load_verify_locations(cadata="broken")
1346 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1347 ctx.load_verify_locations(cadata=b"broken")
1348
1349
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001350 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001351 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001352 ctx.load_dh_params(DHFILE)
1353 if os.name != 'nt':
1354 ctx.load_dh_params(BYTES_DHFILE)
1355 self.assertRaises(TypeError, ctx.load_dh_params)
1356 self.assertRaises(TypeError, ctx.load_dh_params, None)
1357 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001358 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001359 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001360 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001361 ctx.load_dh_params(CERTFILE)
1362
Antoine Pitroub0182c82010-10-12 20:09:02 +00001363 def test_session_stats(self):
1364 for proto in PROTOCOLS:
1365 ctx = ssl.SSLContext(proto)
1366 self.assertEqual(ctx.session_stats(), {
1367 'number': 0,
1368 'connect': 0,
1369 'connect_good': 0,
1370 'connect_renegotiate': 0,
1371 'accept': 0,
1372 'accept_good': 0,
1373 'accept_renegotiate': 0,
1374 'hits': 0,
1375 'misses': 0,
1376 'timeouts': 0,
1377 'cache_full': 0,
1378 })
1379
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001380 def test_set_default_verify_paths(self):
1381 # There's not much we can do to test that it acts as expected,
1382 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001383 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001384 ctx.set_default_verify_paths()
1385
Antoine Pitrou501da612011-12-21 09:27:41 +01001386 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001387 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001388 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001389 ctx.set_ecdh_curve("prime256v1")
1390 ctx.set_ecdh_curve(b"prime256v1")
1391 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1392 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1393 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1394 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1395
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001396 @needs_sni
1397 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001398 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001399
1400 # set_servername_callback expects a callable, or None
1401 self.assertRaises(TypeError, ctx.set_servername_callback)
1402 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1403 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1404 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1405
1406 def dummycallback(sock, servername, ctx):
1407 pass
1408 ctx.set_servername_callback(None)
1409 ctx.set_servername_callback(dummycallback)
1410
1411 @needs_sni
1412 def test_sni_callback_refcycle(self):
1413 # Reference cycles through the servername callback are detected
1414 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001415 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001416 def dummycallback(sock, servername, ctx, cycle=ctx):
1417 pass
1418 ctx.set_servername_callback(dummycallback)
1419 wr = weakref.ref(ctx)
1420 del ctx, dummycallback
1421 gc.collect()
1422 self.assertIs(wr(), None)
1423
Christian Heimes9a5395a2013-06-17 15:44:12 +02001424 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001426 self.assertEqual(ctx.cert_store_stats(),
1427 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1428 ctx.load_cert_chain(CERTFILE)
1429 self.assertEqual(ctx.cert_store_stats(),
1430 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1431 ctx.load_verify_locations(CERTFILE)
1432 self.assertEqual(ctx.cert_store_stats(),
1433 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001434 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001435 self.assertEqual(ctx.cert_store_stats(),
1436 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1437
1438 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001439 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001440 self.assertEqual(ctx.get_ca_certs(), [])
1441 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1442 ctx.load_verify_locations(CERTFILE)
1443 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001444 # but CAFILE_CACERT is a CA cert
1445 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001446 self.assertEqual(ctx.get_ca_certs(),
1447 [{'issuer': ((('organizationName', 'Root CA'),),
1448 (('organizationalUnitName', 'http://www.cacert.org'),),
1449 (('commonName', 'CA Cert Signing Authority'),),
1450 (('emailAddress', 'support@cacert.org'),)),
1451 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1452 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1453 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001454 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001455 'subject': ((('organizationName', 'Root CA'),),
1456 (('organizationalUnitName', 'http://www.cacert.org'),),
1457 (('commonName', 'CA Cert Signing Authority'),),
1458 (('emailAddress', 'support@cacert.org'),)),
1459 'version': 3}])
1460
Martin Panterb55f8b72016-01-14 12:53:56 +00001461 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001462 pem = f.read()
1463 der = ssl.PEM_cert_to_DER_cert(pem)
1464 self.assertEqual(ctx.get_ca_certs(True), [der])
1465
Christian Heimes72d28502013-11-23 13:56:58 +01001466 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001467 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001468 ctx.load_default_certs()
1469
Christian Heimesa170fa12017-09-15 20:27:30 +02001470 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001471 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1472 ctx.load_default_certs()
1473
Christian Heimesa170fa12017-09-15 20:27:30 +02001474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001475 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1476
Christian Heimesa170fa12017-09-15 20:27:30 +02001477 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001478 self.assertRaises(TypeError, ctx.load_default_certs, None)
1479 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1480
Benjamin Peterson91244e02014-10-03 18:17:15 -04001481 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001482 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001483 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001484 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001485 with support.EnvironmentVarGuard() as env:
1486 env["SSL_CERT_DIR"] = CAPATH
1487 env["SSL_CERT_FILE"] = CERTFILE
1488 ctx.load_default_certs()
1489 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1490
Benjamin Peterson91244e02014-10-03 18:17:15 -04001491 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001492 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001493 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001494 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001495 ctx.load_default_certs()
1496 stats = ctx.cert_store_stats()
1497
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001499 with support.EnvironmentVarGuard() as env:
1500 env["SSL_CERT_DIR"] = CAPATH
1501 env["SSL_CERT_FILE"] = CERTFILE
1502 ctx.load_default_certs()
1503 stats["x509"] += 1
1504 self.assertEqual(ctx.cert_store_stats(), stats)
1505
Christian Heimes358cfd42016-09-10 22:43:48 +02001506 def _assert_context_options(self, ctx):
1507 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1508 if OP_NO_COMPRESSION != 0:
1509 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1510 OP_NO_COMPRESSION)
1511 if OP_SINGLE_DH_USE != 0:
1512 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1513 OP_SINGLE_DH_USE)
1514 if OP_SINGLE_ECDH_USE != 0:
1515 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1516 OP_SINGLE_ECDH_USE)
1517 if OP_CIPHER_SERVER_PREFERENCE != 0:
1518 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1519 OP_CIPHER_SERVER_PREFERENCE)
1520
Christian Heimes4c05b472013-11-23 15:58:30 +01001521 def test_create_default_context(self):
1522 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001523
Christian Heimesa170fa12017-09-15 20:27:30 +02001524 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001525 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001526 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001527 self._assert_context_options(ctx)
1528
Christian Heimes4c05b472013-11-23 15:58:30 +01001529 with open(SIGNING_CA) as f:
1530 cadata = f.read()
1531 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1532 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001533 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001534 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001535 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001536
1537 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001539 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001540 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001541
Christian Heimes67986f92013-11-23 22:43:47 +01001542 def test__create_stdlib_context(self):
1543 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001544 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001545 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001546 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001547 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001548
1549 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1550 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1551 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001552 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001553
1554 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001555 cert_reqs=ssl.CERT_REQUIRED,
1556 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001557 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1558 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001559 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001560 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001561
1562 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001563 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001564 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001565 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001566
Christian Heimes1aa9a752013-12-02 02:41:19 +01001567 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001568 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001569 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001570 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001571
Christian Heimese82c0342017-09-15 20:29:57 +02001572 # Auto set CERT_REQUIRED
1573 ctx.check_hostname = True
1574 self.assertTrue(ctx.check_hostname)
1575 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1576 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001577 ctx.verify_mode = ssl.CERT_REQUIRED
1578 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001579 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001580
Christian Heimese82c0342017-09-15 20:29:57 +02001581 # Changing verify_mode does not affect check_hostname
1582 ctx.check_hostname = False
1583 ctx.verify_mode = ssl.CERT_NONE
1584 ctx.check_hostname = False
1585 self.assertFalse(ctx.check_hostname)
1586 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1587 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001588 ctx.check_hostname = True
1589 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001590 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1591
1592 ctx.check_hostname = False
1593 ctx.verify_mode = ssl.CERT_OPTIONAL
1594 ctx.check_hostname = False
1595 self.assertFalse(ctx.check_hostname)
1596 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1597 # keep CERT_OPTIONAL
1598 ctx.check_hostname = True
1599 self.assertTrue(ctx.check_hostname)
1600 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001601
1602 # Cannot set CERT_NONE with check_hostname enabled
1603 with self.assertRaises(ValueError):
1604 ctx.verify_mode = ssl.CERT_NONE
1605 ctx.check_hostname = False
1606 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001607 ctx.verify_mode = ssl.CERT_NONE
1608 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001609
Christian Heimes5fe668c2016-09-12 00:01:11 +02001610 def test_context_client_server(self):
1611 # PROTOCOL_TLS_CLIENT has sane defaults
1612 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1613 self.assertTrue(ctx.check_hostname)
1614 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1615
1616 # PROTOCOL_TLS_SERVER has different but also sane defaults
1617 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1618 self.assertFalse(ctx.check_hostname)
1619 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1620
Christian Heimes4df60f12017-09-15 20:26:05 +02001621 def test_context_custom_class(self):
1622 class MySSLSocket(ssl.SSLSocket):
1623 pass
1624
1625 class MySSLObject(ssl.SSLObject):
1626 pass
1627
1628 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1629 ctx.sslsocket_class = MySSLSocket
1630 ctx.sslobject_class = MySSLObject
1631
1632 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1633 self.assertIsInstance(sock, MySSLSocket)
1634 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1635 self.assertIsInstance(obj, MySSLObject)
1636
Antoine Pitrou152efa22010-05-16 18:19:27 +00001637
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001638class SSLErrorTests(unittest.TestCase):
1639
1640 def test_str(self):
1641 # The str() of a SSLError doesn't include the errno
1642 e = ssl.SSLError(1, "foo")
1643 self.assertEqual(str(e), "foo")
1644 self.assertEqual(e.errno, 1)
1645 # Same for a subclass
1646 e = ssl.SSLZeroReturnError(1, "foo")
1647 self.assertEqual(str(e), "foo")
1648 self.assertEqual(e.errno, 1)
1649
1650 def test_lib_reason(self):
1651 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001652 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001653 with self.assertRaises(ssl.SSLError) as cm:
1654 ctx.load_dh_params(CERTFILE)
1655 self.assertEqual(cm.exception.library, 'PEM')
1656 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1657 s = str(cm.exception)
1658 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1659
1660 def test_subclass(self):
1661 # Check that the appropriate SSLError subclass is raised
1662 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001663 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1664 ctx.check_hostname = False
1665 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001666 with socket.socket() as s:
1667 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001668 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001669 c = socket.socket()
1670 c.connect(s.getsockname())
1671 c.setblocking(False)
1672 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001673 with self.assertRaises(ssl.SSLWantReadError) as cm:
1674 c.do_handshake()
1675 s = str(cm.exception)
1676 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1677 # For compatibility
1678 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1679
1680
Christian Heimes61d478c2018-01-27 15:51:38 +01001681 def test_bad_server_hostname(self):
1682 ctx = ssl.create_default_context()
1683 with self.assertRaises(ValueError):
1684 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1685 server_hostname="")
1686 with self.assertRaises(ValueError):
1687 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1688 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001689 with self.assertRaises(TypeError):
1690 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1691 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001692
1693
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001694class MemoryBIOTests(unittest.TestCase):
1695
1696 def test_read_write(self):
1697 bio = ssl.MemoryBIO()
1698 bio.write(b'foo')
1699 self.assertEqual(bio.read(), b'foo')
1700 self.assertEqual(bio.read(), b'')
1701 bio.write(b'foo')
1702 bio.write(b'bar')
1703 self.assertEqual(bio.read(), b'foobar')
1704 self.assertEqual(bio.read(), b'')
1705 bio.write(b'baz')
1706 self.assertEqual(bio.read(2), b'ba')
1707 self.assertEqual(bio.read(1), b'z')
1708 self.assertEqual(bio.read(1), b'')
1709
1710 def test_eof(self):
1711 bio = ssl.MemoryBIO()
1712 self.assertFalse(bio.eof)
1713 self.assertEqual(bio.read(), b'')
1714 self.assertFalse(bio.eof)
1715 bio.write(b'foo')
1716 self.assertFalse(bio.eof)
1717 bio.write_eof()
1718 self.assertFalse(bio.eof)
1719 self.assertEqual(bio.read(2), b'fo')
1720 self.assertFalse(bio.eof)
1721 self.assertEqual(bio.read(1), b'o')
1722 self.assertTrue(bio.eof)
1723 self.assertEqual(bio.read(), b'')
1724 self.assertTrue(bio.eof)
1725
1726 def test_pending(self):
1727 bio = ssl.MemoryBIO()
1728 self.assertEqual(bio.pending, 0)
1729 bio.write(b'foo')
1730 self.assertEqual(bio.pending, 3)
1731 for i in range(3):
1732 bio.read(1)
1733 self.assertEqual(bio.pending, 3-i-1)
1734 for i in range(3):
1735 bio.write(b'x')
1736 self.assertEqual(bio.pending, i+1)
1737 bio.read()
1738 self.assertEqual(bio.pending, 0)
1739
1740 def test_buffer_types(self):
1741 bio = ssl.MemoryBIO()
1742 bio.write(b'foo')
1743 self.assertEqual(bio.read(), b'foo')
1744 bio.write(bytearray(b'bar'))
1745 self.assertEqual(bio.read(), b'bar')
1746 bio.write(memoryview(b'baz'))
1747 self.assertEqual(bio.read(), b'baz')
1748
1749 def test_error_types(self):
1750 bio = ssl.MemoryBIO()
1751 self.assertRaises(TypeError, bio.write, 'foo')
1752 self.assertRaises(TypeError, bio.write, None)
1753 self.assertRaises(TypeError, bio.write, True)
1754 self.assertRaises(TypeError, bio.write, 1)
1755
1756
Christian Heimes9d50ab52018-02-27 10:17:30 +01001757class SSLObjectTests(unittest.TestCase):
1758 def test_private_init(self):
1759 bio = ssl.MemoryBIO()
1760 with self.assertRaisesRegex(TypeError, "public constructor"):
1761 ssl.SSLObject(bio, bio)
1762
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001763 def test_unwrap(self):
1764 client_ctx, server_ctx, hostname = testing_context()
1765 c_in = ssl.MemoryBIO()
1766 c_out = ssl.MemoryBIO()
1767 s_in = ssl.MemoryBIO()
1768 s_out = ssl.MemoryBIO()
1769 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1770 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1771
1772 # Loop on the handshake for a bit to get it settled
1773 for _ in range(5):
1774 try:
1775 client.do_handshake()
1776 except ssl.SSLWantReadError:
1777 pass
1778 if c_out.pending:
1779 s_in.write(c_out.read())
1780 try:
1781 server.do_handshake()
1782 except ssl.SSLWantReadError:
1783 pass
1784 if s_out.pending:
1785 c_in.write(s_out.read())
1786 # Now the handshakes should be complete (don't raise WantReadError)
1787 client.do_handshake()
1788 server.do_handshake()
1789
1790 # Now if we unwrap one side unilaterally, it should send close-notify
1791 # and raise WantReadError:
1792 with self.assertRaises(ssl.SSLWantReadError):
1793 client.unwrap()
1794
1795 # But server.unwrap() does not raise, because it reads the client's
1796 # close-notify:
1797 s_in.write(c_out.read())
1798 server.unwrap()
1799
1800 # And now that the client gets the server's close-notify, it doesn't
1801 # raise either.
1802 c_in.write(s_out.read())
1803 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001804
Martin Panter3840b2a2016-03-27 01:53:46 +00001805class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001806 """Tests that connect to a simple server running in the background"""
1807
1808 def setUp(self):
1809 server = ThreadedEchoServer(SIGNED_CERTFILE)
1810 self.server_addr = (HOST, server.port)
1811 server.__enter__()
1812 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001813
Antoine Pitrou480a1242010-04-28 21:37:09 +00001814 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001815 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001816 cert_reqs=ssl.CERT_NONE) as s:
1817 s.connect(self.server_addr)
1818 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001819 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001820
Martin Panter3840b2a2016-03-27 01:53:46 +00001821 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001822 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001823 cert_reqs=ssl.CERT_REQUIRED,
1824 ca_certs=SIGNING_CA) as s:
1825 s.connect(self.server_addr)
1826 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001827 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001828
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 def test_connect_fail(self):
1830 # This should fail because we have no verification certs. Connection
1831 # failure crashes ThreadedEchoServer, so run this in an independent
1832 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001833 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001834 cert_reqs=ssl.CERT_REQUIRED)
1835 self.addCleanup(s.close)
1836 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1837 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001838
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001839 def test_connect_ex(self):
1840 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001841 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001842 cert_reqs=ssl.CERT_REQUIRED,
1843 ca_certs=SIGNING_CA)
1844 self.addCleanup(s.close)
1845 self.assertEqual(0, s.connect_ex(self.server_addr))
1846 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001847
1848 def test_non_blocking_connect_ex(self):
1849 # Issue #11326: non-blocking connect_ex() should allow handshake
1850 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001851 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001852 cert_reqs=ssl.CERT_REQUIRED,
1853 ca_certs=SIGNING_CA,
1854 do_handshake_on_connect=False)
1855 self.addCleanup(s.close)
1856 s.setblocking(False)
1857 rc = s.connect_ex(self.server_addr)
1858 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1859 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1860 # Wait for connect to finish
1861 select.select([], [s], [], 5.0)
1862 # Non-blocking handshake
1863 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001864 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001865 s.do_handshake()
1866 break
1867 except ssl.SSLWantReadError:
1868 select.select([s], [], [], 5.0)
1869 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001870 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001871 # SSL established
1872 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001873
Antoine Pitrou152efa22010-05-16 18:19:27 +00001874 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001875 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001876 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001877 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1878 s.connect(self.server_addr)
1879 self.assertEqual({}, s.getpeercert())
1880 # Same with a server hostname
1881 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1882 server_hostname="dummy") as s:
1883 s.connect(self.server_addr)
1884 ctx.verify_mode = ssl.CERT_REQUIRED
1885 # This should succeed because we specify the root cert
1886 ctx.load_verify_locations(SIGNING_CA)
1887 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1888 s.connect(self.server_addr)
1889 cert = s.getpeercert()
1890 self.assertTrue(cert)
1891
1892 def test_connect_with_context_fail(self):
1893 # This should fail because we have no verification certs. Connection
1894 # failure crashes ThreadedEchoServer, so run this in an independent
1895 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001896 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001897 ctx.verify_mode = ssl.CERT_REQUIRED
1898 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1899 self.addCleanup(s.close)
1900 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1901 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001902
1903 def test_connect_capath(self):
1904 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001905 # NOTE: the subject hashing algorithm has been changed between
1906 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1907 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001908 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001909 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001910 ctx.verify_mode = ssl.CERT_REQUIRED
1911 ctx.load_verify_locations(capath=CAPATH)
1912 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1913 s.connect(self.server_addr)
1914 cert = s.getpeercert()
1915 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001916
Martin Panter3840b2a2016-03-27 01:53:46 +00001917 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001918 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001919 ctx.verify_mode = ssl.CERT_REQUIRED
1920 ctx.load_verify_locations(capath=BYTES_CAPATH)
1921 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1922 s.connect(self.server_addr)
1923 cert = s.getpeercert()
1924 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001925
Christian Heimesefff7062013-11-21 03:35:02 +01001926 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001927 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001928 pem = f.read()
1929 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001930 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001931 ctx.verify_mode = ssl.CERT_REQUIRED
1932 ctx.load_verify_locations(cadata=pem)
1933 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1934 s.connect(self.server_addr)
1935 cert = s.getpeercert()
1936 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001937
Martin Panter3840b2a2016-03-27 01:53:46 +00001938 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001939 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001940 ctx.verify_mode = ssl.CERT_REQUIRED
1941 ctx.load_verify_locations(cadata=der)
1942 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1943 s.connect(self.server_addr)
1944 cert = s.getpeercert()
1945 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001946
Antoine Pitroue3220242010-04-24 11:13:53 +00001947 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1948 def test_makefile_close(self):
1949 # Issue #5238: creating a file-like object with makefile() shouldn't
1950 # delay closing the underlying "real socket" (here tested with its
1951 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001952 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001953 ss.connect(self.server_addr)
1954 fd = ss.fileno()
1955 f = ss.makefile()
1956 f.close()
1957 # The fd is still open
1958 os.read(fd, 0)
1959 # Closing the SSL socket should close the fd too
1960 ss.close()
1961 gc.collect()
1962 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001963 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001964 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001965
Antoine Pitrou480a1242010-04-28 21:37:09 +00001966 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001967 s = socket.socket(socket.AF_INET)
1968 s.connect(self.server_addr)
1969 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001970 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001971 cert_reqs=ssl.CERT_NONE,
1972 do_handshake_on_connect=False)
1973 self.addCleanup(s.close)
1974 count = 0
1975 while True:
1976 try:
1977 count += 1
1978 s.do_handshake()
1979 break
1980 except ssl.SSLWantReadError:
1981 select.select([s], [], [])
1982 except ssl.SSLWantWriteError:
1983 select.select([], [s], [])
1984 if support.verbose:
1985 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001986
Antoine Pitrou480a1242010-04-28 21:37:09 +00001987 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001988 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001989
Martin Panter3840b2a2016-03-27 01:53:46 +00001990 def test_get_server_certificate_fail(self):
1991 # Connection failure crashes ThreadedEchoServer, so run this in an
1992 # independent test method
1993 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001994
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001995 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001996 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001997 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1998 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001999 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002000 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
2001 s.connect(self.server_addr)
2002 # Error checking can happen at instantiation or when connecting
2003 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
2004 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02002005 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00002006 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
2007 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00002008
Christian Heimes9a5395a2013-06-17 15:44:12 +02002009 def test_get_ca_certs_capath(self):
2010 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02002011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00002012 ctx.load_verify_locations(capath=CAPATH)
2013 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02002014 with ctx.wrap_socket(socket.socket(socket.AF_INET),
2015 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00002016 s.connect(self.server_addr)
2017 cert = s.getpeercert()
2018 self.assertTrue(cert)
2019 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002020
Christian Heimes575596e2013-12-15 21:49:17 +01002021 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002022 def test_context_setget(self):
2023 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002024 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2025 ctx1.load_verify_locations(capath=CAPATH)
2026 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2027 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002028 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002029 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002030 ss.connect(self.server_addr)
2031 self.assertIs(ss.context, ctx1)
2032 self.assertIs(ss._sslobj.context, ctx1)
2033 ss.context = ctx2
2034 self.assertIs(ss.context, ctx2)
2035 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002036
2037 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2038 # A simple IO loop. Call func(*args) depending on the error we get
2039 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2040 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002041 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002042 count = 0
2043 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002044 if time.monotonic() > deadline:
2045 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002046 errno = None
2047 count += 1
2048 try:
2049 ret = func(*args)
2050 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002051 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002052 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002053 raise
2054 errno = e.errno
2055 # Get any data from the outgoing BIO irrespective of any error, and
2056 # send it to the socket.
2057 buf = outgoing.read()
2058 sock.sendall(buf)
2059 # If there's no error, we're done. For WANT_READ, we need to get
2060 # data from the socket and put it in the incoming BIO.
2061 if errno is None:
2062 break
2063 elif errno == ssl.SSL_ERROR_WANT_READ:
2064 buf = sock.recv(32768)
2065 if buf:
2066 incoming.write(buf)
2067 else:
2068 incoming.write_eof()
2069 if support.verbose:
2070 sys.stdout.write("Needed %d calls to complete %s().\n"
2071 % (count, func.__name__))
2072 return ret
2073
Martin Panter3840b2a2016-03-27 01:53:46 +00002074 def test_bio_handshake(self):
2075 sock = socket.socket(socket.AF_INET)
2076 self.addCleanup(sock.close)
2077 sock.connect(self.server_addr)
2078 incoming = ssl.MemoryBIO()
2079 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002080 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2081 self.assertTrue(ctx.check_hostname)
2082 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002083 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002084 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2085 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002086 self.assertIs(sslobj._sslobj.owner, sslobj)
2087 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002088 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002089 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002090 self.assertRaises(ValueError, sslobj.getpeercert)
2091 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2092 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2093 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2094 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002095 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002096 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002097 self.assertTrue(sslobj.getpeercert())
2098 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2099 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2100 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002101 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002102 except ssl.SSLSyscallError:
2103 # If the server shuts down the TCP connection without sending a
2104 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2105 pass
2106 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2107
2108 def test_bio_read_write_data(self):
2109 sock = socket.socket(socket.AF_INET)
2110 self.addCleanup(sock.close)
2111 sock.connect(self.server_addr)
2112 incoming = ssl.MemoryBIO()
2113 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002115 ctx.verify_mode = ssl.CERT_NONE
2116 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2117 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2118 req = b'FOO\n'
2119 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2120 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2121 self.assertEqual(buf, b'foo\n')
2122 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002123
2124
Martin Panter3840b2a2016-03-27 01:53:46 +00002125class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002126
Martin Panter3840b2a2016-03-27 01:53:46 +00002127 def test_timeout_connect_ex(self):
2128 # Issue #12065: on a timeout, connect_ex() should return the original
2129 # errno (mimicking the behaviour of non-SSL sockets).
2130 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002131 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002132 cert_reqs=ssl.CERT_REQUIRED,
2133 do_handshake_on_connect=False)
2134 self.addCleanup(s.close)
2135 s.settimeout(0.0000001)
2136 rc = s.connect_ex((REMOTE_HOST, 443))
2137 if rc == 0:
2138 self.skipTest("REMOTE_HOST responded too quickly")
2139 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2140
2141 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2142 def test_get_server_certificate_ipv6(self):
2143 with support.transient_internet('ipv6.google.com'):
2144 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2145 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2146
Martin Panter3840b2a2016-03-27 01:53:46 +00002147
2148def _test_get_server_certificate(test, host, port, cert=None):
2149 pem = ssl.get_server_certificate((host, port))
2150 if not pem:
2151 test.fail("No server certificate on %s:%s!" % (host, port))
2152
2153 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2154 if not pem:
2155 test.fail("No server certificate on %s:%s!" % (host, port))
2156 if support.verbose:
2157 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2158
2159def _test_get_server_certificate_fail(test, host, port):
2160 try:
2161 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2162 except ssl.SSLError as x:
2163 #should fail
2164 if support.verbose:
2165 sys.stdout.write("%s\n" % x)
2166 else:
2167 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2168
2169
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002170from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002171
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002172class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002173
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002174 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002175
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002176 """A mildly complicated class, because we want it to work both
2177 with and without the SSL wrapper around the socket connection, so
2178 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002179
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002180 def __init__(self, server, connsock, addr):
2181 self.server = server
2182 self.running = False
2183 self.sock = connsock
2184 self.addr = addr
2185 self.sock.setblocking(1)
2186 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002187 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002188 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002189
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002190 def wrap_conn(self):
2191 try:
2192 self.sslconn = self.server.context.wrap_socket(
2193 self.sock, server_side=True)
2194 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2195 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Christian Heimes529525f2018-05-23 22:24:45 +02002196 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002197 # We treat ConnectionResetError as though it were an
2198 # SSLError - OpenSSL on Ubuntu abruptly closes the
2199 # connection when asked to use an unsupported protocol.
2200 #
Christian Heimes529525f2018-05-23 22:24:45 +02002201 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2202 # tries to send session tickets after handshake.
2203 # https://github.com/openssl/openssl/issues/6342
2204 self.server.conn_errors.append(str(e))
2205 if self.server.chatty:
2206 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2207 self.running = False
2208 self.close()
2209 return False
2210 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002211 # OSError may occur with wrong protocols, e.g. both
2212 # sides use PROTOCOL_TLS_SERVER.
2213 #
2214 # XXX Various errors can have happened here, for example
2215 # a mismatching protocol version, an invalid certificate,
2216 # or a low-level bug. This should be made more discriminating.
2217 #
2218 # bpo-31323: Store the exception as string to prevent
2219 # a reference leak: server -> conn_errors -> exception
2220 # -> traceback -> self (ConnectionHandler) -> server
2221 self.server.conn_errors.append(str(e))
2222 if self.server.chatty:
2223 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2224 self.running = False
2225 self.server.stop()
2226 self.close()
2227 return False
2228 else:
2229 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2230 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2231 cert = self.sslconn.getpeercert()
2232 if support.verbose and self.server.chatty:
2233 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2234 cert_binary = self.sslconn.getpeercert(True)
2235 if support.verbose and self.server.chatty:
2236 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2237 cipher = self.sslconn.cipher()
2238 if support.verbose and self.server.chatty:
2239 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2240 sys.stdout.write(" server: selected protocol is now "
2241 + str(self.sslconn.selected_npn_protocol()) + "\n")
2242 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002243
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002244 def read(self):
2245 if self.sslconn:
2246 return self.sslconn.read()
2247 else:
2248 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002249
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002250 def write(self, bytes):
2251 if self.sslconn:
2252 return self.sslconn.write(bytes)
2253 else:
2254 return self.sock.send(bytes)
2255
2256 def close(self):
2257 if self.sslconn:
2258 self.sslconn.close()
2259 else:
2260 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002261
Antoine Pitrou480a1242010-04-28 21:37:09 +00002262 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002263 self.running = True
2264 if not self.server.starttls_server:
2265 if not self.wrap_conn():
2266 return
2267 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002268 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002269 msg = self.read()
2270 stripped = msg.strip()
2271 if not stripped:
2272 # eof, so quit this handler
2273 self.running = False
2274 try:
2275 self.sock = self.sslconn.unwrap()
2276 except OSError:
2277 # Many tests shut the TCP connection down
2278 # without an SSL shutdown. This causes
2279 # unwrap() to raise OSError with errno=0!
2280 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002281 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002282 self.sslconn = None
2283 self.close()
2284 elif stripped == b'over':
2285 if support.verbose and self.server.connectionchatty:
2286 sys.stdout.write(" server: client closed connection\n")
2287 self.close()
2288 return
2289 elif (self.server.starttls_server and
2290 stripped == b'STARTTLS'):
2291 if support.verbose and self.server.connectionchatty:
2292 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2293 self.write(b"OK\n")
2294 if not self.wrap_conn():
2295 return
2296 elif (self.server.starttls_server and self.sslconn
2297 and stripped == b'ENDTLS'):
2298 if support.verbose and self.server.connectionchatty:
2299 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2300 self.write(b"OK\n")
2301 self.sock = self.sslconn.unwrap()
2302 self.sslconn = None
2303 if support.verbose and self.server.connectionchatty:
2304 sys.stdout.write(" server: connection is now unencrypted...\n")
2305 elif stripped == b'CB tls-unique':
2306 if support.verbose and self.server.connectionchatty:
2307 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2308 data = self.sslconn.get_channel_binding("tls-unique")
2309 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002310 elif stripped == b'PHA':
2311 if support.verbose and self.server.connectionchatty:
2312 sys.stdout.write(" server: initiating post handshake auth\n")
2313 try:
2314 self.sslconn.verify_client_post_handshake()
2315 except ssl.SSLError as e:
2316 self.write(repr(e).encode("us-ascii") + b"\n")
2317 else:
2318 self.write(b"OK\n")
2319 elif stripped == b'HASCERT':
2320 if self.sslconn.getpeercert() is not None:
2321 self.write(b'TRUE\n')
2322 else:
2323 self.write(b'FALSE\n')
2324 elif stripped == b'GETCERT':
2325 cert = self.sslconn.getpeercert()
2326 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002327 else:
2328 if (support.verbose and
2329 self.server.connectionchatty):
2330 ctype = (self.sslconn and "encrypted") or "unencrypted"
2331 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2332 % (msg, ctype, msg.lower(), ctype))
2333 self.write(msg.lower())
Christian Heimes529525f2018-05-23 22:24:45 +02002334 except ConnectionResetError:
2335 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2336 # when connection is not shut down gracefully.
2337 if self.server.chatty and support.verbose:
2338 sys.stdout.write(
2339 " Connection reset by peer: {}\n".format(
2340 self.addr)
2341 )
2342 self.close()
2343 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002344 except OSError:
2345 if self.server.chatty:
2346 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002347 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002348 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002349
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002350 # normally, we'd just stop here, but for the test
2351 # harness, we want to stop the server
2352 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002353
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002354 def __init__(self, certificate=None, ssl_version=None,
2355 certreqs=None, cacerts=None,
2356 chatty=True, connectionchatty=False, starttls_server=False,
2357 npn_protocols=None, alpn_protocols=None,
2358 ciphers=None, context=None):
2359 if context:
2360 self.context = context
2361 else:
2362 self.context = ssl.SSLContext(ssl_version
2363 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002364 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002365 self.context.verify_mode = (certreqs if certreqs is not None
2366 else ssl.CERT_NONE)
2367 if cacerts:
2368 self.context.load_verify_locations(cacerts)
2369 if certificate:
2370 self.context.load_cert_chain(certificate)
2371 if npn_protocols:
2372 self.context.set_npn_protocols(npn_protocols)
2373 if alpn_protocols:
2374 self.context.set_alpn_protocols(alpn_protocols)
2375 if ciphers:
2376 self.context.set_ciphers(ciphers)
2377 self.chatty = chatty
2378 self.connectionchatty = connectionchatty
2379 self.starttls_server = starttls_server
2380 self.sock = socket.socket()
2381 self.port = support.bind_port(self.sock)
2382 self.flag = None
2383 self.active = False
2384 self.selected_npn_protocols = []
2385 self.selected_alpn_protocols = []
2386 self.shared_ciphers = []
2387 self.conn_errors = []
2388 threading.Thread.__init__(self)
2389 self.daemon = True
2390
2391 def __enter__(self):
2392 self.start(threading.Event())
2393 self.flag.wait()
2394 return self
2395
2396 def __exit__(self, *args):
2397 self.stop()
2398 self.join()
2399
2400 def start(self, flag=None):
2401 self.flag = flag
2402 threading.Thread.start(self)
2403
2404 def run(self):
2405 self.sock.settimeout(0.05)
2406 self.sock.listen()
2407 self.active = True
2408 if self.flag:
2409 # signal an event
2410 self.flag.set()
2411 while self.active:
2412 try:
2413 newconn, connaddr = self.sock.accept()
2414 if support.verbose and self.chatty:
2415 sys.stdout.write(' server: new connection from '
2416 + repr(connaddr) + '\n')
2417 handler = self.ConnectionHandler(self, newconn, connaddr)
2418 handler.start()
2419 handler.join()
2420 except socket.timeout:
2421 pass
2422 except KeyboardInterrupt:
2423 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002424 except BaseException as e:
2425 if support.verbose and self.chatty:
2426 sys.stdout.write(
2427 ' connection handling failed: ' + repr(e) + '\n')
2428
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002429 self.sock.close()
2430
2431 def stop(self):
2432 self.active = False
2433
2434class AsyncoreEchoServer(threading.Thread):
2435
2436 # this one's based on asyncore.dispatcher
2437
2438 class EchoServer (asyncore.dispatcher):
2439
2440 class ConnectionHandler(asyncore.dispatcher_with_send):
2441
2442 def __init__(self, conn, certfile):
2443 self.socket = test_wrap_socket(conn, server_side=True,
2444 certfile=certfile,
2445 do_handshake_on_connect=False)
2446 asyncore.dispatcher_with_send.__init__(self, self.socket)
2447 self._ssl_accepting = True
2448 self._do_ssl_handshake()
2449
2450 def readable(self):
2451 if isinstance(self.socket, ssl.SSLSocket):
2452 while self.socket.pending() > 0:
2453 self.handle_read_event()
2454 return True
2455
2456 def _do_ssl_handshake(self):
2457 try:
2458 self.socket.do_handshake()
2459 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2460 return
2461 except ssl.SSLEOFError:
2462 return self.handle_close()
2463 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002464 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002465 except OSError as err:
2466 if err.args[0] == errno.ECONNABORTED:
2467 return self.handle_close()
2468 else:
2469 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002470
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 def handle_read(self):
2472 if self._ssl_accepting:
2473 self._do_ssl_handshake()
2474 else:
2475 data = self.recv(1024)
2476 if support.verbose:
2477 sys.stdout.write(" server: read %s from client\n" % repr(data))
2478 if not data:
2479 self.close()
2480 else:
2481 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002482
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002483 def handle_close(self):
2484 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002485 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002486 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002487
2488 def handle_error(self):
2489 raise
2490
Trent Nelson78520002008-04-10 20:54:35 +00002491 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002492 self.certfile = certfile
2493 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2494 self.port = support.bind_port(sock, '')
2495 asyncore.dispatcher.__init__(self, sock)
2496 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002497
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002498 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002499 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002500 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2501 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002502
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002503 def handle_error(self):
2504 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002505
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002506 def __init__(self, certfile):
2507 self.flag = None
2508 self.active = False
2509 self.server = self.EchoServer(certfile)
2510 self.port = self.server.port
2511 threading.Thread.__init__(self)
2512 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002513
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002514 def __str__(self):
2515 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002516
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002517 def __enter__(self):
2518 self.start(threading.Event())
2519 self.flag.wait()
2520 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002521
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002522 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002523 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002524 sys.stdout.write(" cleanup: stopping server.\n")
2525 self.stop()
2526 if support.verbose:
2527 sys.stdout.write(" cleanup: joining server thread.\n")
2528 self.join()
2529 if support.verbose:
2530 sys.stdout.write(" cleanup: successfully joined.\n")
2531 # make sure that ConnectionHandler is removed from socket_map
2532 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002533
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002534 def start (self, flag=None):
2535 self.flag = flag
2536 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002537
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002538 def run(self):
2539 self.active = True
2540 if self.flag:
2541 self.flag.set()
2542 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002543 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002544 asyncore.loop(1)
2545 except:
2546 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002547
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002548 def stop(self):
2549 self.active = False
2550 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002551
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002552def server_params_test(client_context, server_context, indata=b"FOO\n",
2553 chatty=True, connectionchatty=False, sni_name=None,
2554 session=None):
2555 """
2556 Launch a server, connect a client to it and try various reads
2557 and writes.
2558 """
2559 stats = {}
2560 server = ThreadedEchoServer(context=server_context,
2561 chatty=chatty,
2562 connectionchatty=False)
2563 with server:
2564 with client_context.wrap_socket(socket.socket(),
2565 server_hostname=sni_name, session=session) as s:
2566 s.connect((HOST, server.port))
2567 for arg in [indata, bytearray(indata), memoryview(indata)]:
2568 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002569 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002570 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002571 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002572 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002573 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002574 if connectionchatty:
2575 if support.verbose:
2576 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002577 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002578 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002579 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2580 % (outdata[:20], len(outdata),
2581 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002582 s.write(b"over\n")
2583 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002584 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002585 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002586 stats.update({
2587 'compression': s.compression(),
2588 'cipher': s.cipher(),
2589 'peercert': s.getpeercert(),
2590 'client_alpn_protocol': s.selected_alpn_protocol(),
2591 'client_npn_protocol': s.selected_npn_protocol(),
2592 'version': s.version(),
2593 'session_reused': s.session_reused,
2594 'session': s.session,
2595 })
2596 s.close()
2597 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2598 stats['server_npn_protocols'] = server.selected_npn_protocols
2599 stats['server_shared_ciphers'] = server.shared_ciphers
2600 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002601
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602def try_protocol_combo(server_protocol, client_protocol, expect_success,
2603 certsreqs=None, server_options=0, client_options=0):
2604 """
2605 Try to SSL-connect using *client_protocol* to *server_protocol*.
2606 If *expect_success* is true, assert that the connection succeeds,
2607 if it's false, assert that the connection fails.
2608 Also, if *expect_success* is a string, assert that it is the protocol
2609 version actually used by the connection.
2610 """
2611 if certsreqs is None:
2612 certsreqs = ssl.CERT_NONE
2613 certtype = {
2614 ssl.CERT_NONE: "CERT_NONE",
2615 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2616 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2617 }[certsreqs]
2618 if support.verbose:
2619 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2620 sys.stdout.write(formatstr %
2621 (ssl.get_protocol_name(client_protocol),
2622 ssl.get_protocol_name(server_protocol),
2623 certtype))
2624 client_context = ssl.SSLContext(client_protocol)
2625 client_context.options |= client_options
2626 server_context = ssl.SSLContext(server_protocol)
2627 server_context.options |= server_options
2628
Victor Stinner3ef63442019-02-19 18:06:03 +01002629 min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
2630 if (min_version is not None
2631 # SSLContext.minimum_version is only available on recent OpenSSL
2632 # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
2633 and hasattr(server_context, 'minimum_version')
2634 and server_protocol == ssl.PROTOCOL_TLS
2635 and server_context.minimum_version > min_version):
2636 # If OpenSSL configuration is strict and requires more recent TLS
2637 # version, we have to change the minimum to test old TLS versions.
2638 server_context.minimum_version = min_version
2639
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2641 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2642 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002643 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002644 client_context.set_ciphers("ALL")
2645
2646 for ctx in (client_context, server_context):
2647 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002648 ctx.load_cert_chain(SIGNED_CERTFILE)
2649 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002650 try:
2651 stats = server_params_test(client_context, server_context,
2652 chatty=False, connectionchatty=False)
2653 # Protocol mismatch can result in either an SSLError, or a
2654 # "Connection reset by peer" error.
2655 except ssl.SSLError:
2656 if expect_success:
2657 raise
2658 except OSError as e:
2659 if expect_success or e.errno != errno.ECONNRESET:
2660 raise
2661 else:
2662 if not expect_success:
2663 raise AssertionError(
2664 "Client protocol %s succeeded with server protocol %s!"
2665 % (ssl.get_protocol_name(client_protocol),
2666 ssl.get_protocol_name(server_protocol)))
2667 elif (expect_success is not True
2668 and expect_success != stats['version']):
2669 raise AssertionError("version mismatch: expected %r, got %r"
2670 % (expect_success, stats['version']))
2671
2672
2673class ThreadedTests(unittest.TestCase):
2674
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675 def test_echo(self):
2676 """Basic test of an SSL client connecting to a server"""
2677 if support.verbose:
2678 sys.stdout.write("\n")
2679 for protocol in PROTOCOLS:
2680 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2681 continue
2682 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2683 context = ssl.SSLContext(protocol)
2684 context.load_cert_chain(CERTFILE)
2685 server_params_test(context, context,
2686 chatty=True, connectionchatty=True)
2687
Christian Heimesa170fa12017-09-15 20:27:30 +02002688 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002689
2690 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2691 server_params_test(client_context=client_context,
2692 server_context=server_context,
2693 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002694 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002695
2696 client_context.check_hostname = False
2697 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2698 with self.assertRaises(ssl.SSLError) as e:
2699 server_params_test(client_context=server_context,
2700 server_context=client_context,
2701 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002702 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002703 self.assertIn('called a function you should not call',
2704 str(e.exception))
2705
2706 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2707 with self.assertRaises(ssl.SSLError) as e:
2708 server_params_test(client_context=server_context,
2709 server_context=server_context,
2710 chatty=True, connectionchatty=True)
2711 self.assertIn('called a function you should not call',
2712 str(e.exception))
2713
2714 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2715 with self.assertRaises(ssl.SSLError) as e:
2716 server_params_test(client_context=server_context,
2717 server_context=client_context,
2718 chatty=True, connectionchatty=True)
2719 self.assertIn('called a function you should not call',
2720 str(e.exception))
2721
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002722 def test_getpeercert(self):
2723 if support.verbose:
2724 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002725
2726 client_context, server_context, hostname = testing_context()
2727 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002728 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002729 with client_context.wrap_socket(socket.socket(),
2730 do_handshake_on_connect=False,
2731 server_hostname=hostname) as s:
2732 s.connect((HOST, server.port))
2733 # getpeercert() raise ValueError while the handshake isn't
2734 # done.
2735 with self.assertRaises(ValueError):
2736 s.getpeercert()
2737 s.do_handshake()
2738 cert = s.getpeercert()
2739 self.assertTrue(cert, "Can't get peer certificate.")
2740 cipher = s.cipher()
2741 if support.verbose:
2742 sys.stdout.write(pprint.pformat(cert) + '\n')
2743 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2744 if 'subject' not in cert:
2745 self.fail("No subject field in certificate: %s." %
2746 pprint.pformat(cert))
2747 if ((('organizationName', 'Python Software Foundation'),)
2748 not in cert['subject']):
2749 self.fail(
2750 "Missing or invalid 'organizationName' field in certificate subject; "
2751 "should be 'Python Software Foundation'.")
2752 self.assertIn('notBefore', cert)
2753 self.assertIn('notAfter', cert)
2754 before = ssl.cert_time_to_seconds(cert['notBefore'])
2755 after = ssl.cert_time_to_seconds(cert['notAfter'])
2756 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002757
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002758 @unittest.skipUnless(have_verify_flags(),
2759 "verify_flags need OpenSSL > 0.9.8")
2760 def test_crl_check(self):
2761 if support.verbose:
2762 sys.stdout.write("\n")
2763
Christian Heimesa170fa12017-09-15 20:27:30 +02002764 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002765
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002766 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002767 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002768
2769 # VERIFY_DEFAULT should pass
2770 server = ThreadedEchoServer(context=server_context, chatty=True)
2771 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002772 with client_context.wrap_socket(socket.socket(),
2773 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002774 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002775 cert = s.getpeercert()
2776 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002777
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002779 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002780
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002781 server = ThreadedEchoServer(context=server_context, chatty=True)
2782 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002783 with client_context.wrap_socket(socket.socket(),
2784 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002785 with self.assertRaisesRegex(ssl.SSLError,
2786 "certificate verify failed"):
2787 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002788
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002789 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002790 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002791
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002792 server = ThreadedEchoServer(context=server_context, chatty=True)
2793 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002794 with client_context.wrap_socket(socket.socket(),
2795 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002796 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002797 cert = s.getpeercert()
2798 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002799
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002800 def test_check_hostname(self):
2801 if support.verbose:
2802 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002803
Christian Heimesa170fa12017-09-15 20:27:30 +02002804 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002805
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002806 # correct hostname should verify
2807 server = ThreadedEchoServer(context=server_context, chatty=True)
2808 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002809 with client_context.wrap_socket(socket.socket(),
2810 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002811 s.connect((HOST, server.port))
2812 cert = s.getpeercert()
2813 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002814
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002815 # incorrect hostname should raise an exception
2816 server = ThreadedEchoServer(context=server_context, chatty=True)
2817 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002818 with client_context.wrap_socket(socket.socket(),
2819 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002820 with self.assertRaisesRegex(
2821 ssl.CertificateError,
2822 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002823 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002824
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002825 # missing server_hostname arg should cause an exception, too
2826 server = ThreadedEchoServer(context=server_context, chatty=True)
2827 with server:
2828 with socket.socket() as s:
2829 with self.assertRaisesRegex(ValueError,
2830 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002831 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002832
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002833 def test_ecc_cert(self):
2834 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2835 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002836 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002837 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2838
2839 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2840 # load ECC cert
2841 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2842
2843 # correct hostname should verify
2844 server = ThreadedEchoServer(context=server_context, chatty=True)
2845 with server:
2846 with client_context.wrap_socket(socket.socket(),
2847 server_hostname=hostname) as s:
2848 s.connect((HOST, server.port))
2849 cert = s.getpeercert()
2850 self.assertTrue(cert, "Can't get peer certificate.")
2851 cipher = s.cipher()[0].split('-')
2852 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2853
2854 def test_dual_rsa_ecc(self):
2855 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2856 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002857 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2858 # algorithms.
2859 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002860 # only ECDSA certs
2861 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2862 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2863
2864 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2865 # load ECC and RSA key/cert pairs
2866 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2867 server_context.load_cert_chain(SIGNED_CERTFILE)
2868
2869 # correct hostname should verify
2870 server = ThreadedEchoServer(context=server_context, chatty=True)
2871 with server:
2872 with client_context.wrap_socket(socket.socket(),
2873 server_hostname=hostname) as s:
2874 s.connect((HOST, server.port))
2875 cert = s.getpeercert()
2876 self.assertTrue(cert, "Can't get peer certificate.")
2877 cipher = s.cipher()[0].split('-')
2878 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2879
Christian Heimes66e57422018-01-29 14:25:13 +01002880 def test_check_hostname_idn(self):
2881 if support.verbose:
2882 sys.stdout.write("\n")
2883
Christian Heimes11a14932018-02-24 02:35:08 +01002884 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002885 server_context.load_cert_chain(IDNSANSFILE)
2886
Christian Heimes11a14932018-02-24 02:35:08 +01002887 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002888 context.verify_mode = ssl.CERT_REQUIRED
2889 context.check_hostname = True
2890 context.load_verify_locations(SIGNING_CA)
2891
2892 # correct hostname should verify, when specified in several
2893 # different ways
2894 idn_hostnames = [
2895 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002896 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002897 ('xn--knig-5qa.idn.pythontest.net',
2898 'xn--knig-5qa.idn.pythontest.net'),
2899 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002900 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002901
2902 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002903 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002904 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2905 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2906 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002907 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2908
2909 # ('königsgäßchen.idna2008.pythontest.net',
2910 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2911 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2912 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2913 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2914 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2915
Christian Heimes66e57422018-01-29 14:25:13 +01002916 ]
2917 for server_hostname, expected_hostname in idn_hostnames:
2918 server = ThreadedEchoServer(context=server_context, chatty=True)
2919 with server:
2920 with context.wrap_socket(socket.socket(),
2921 server_hostname=server_hostname) as s:
2922 self.assertEqual(s.server_hostname, expected_hostname)
2923 s.connect((HOST, server.port))
2924 cert = s.getpeercert()
2925 self.assertEqual(s.server_hostname, expected_hostname)
2926 self.assertTrue(cert, "Can't get peer certificate.")
2927
Christian Heimes66e57422018-01-29 14:25:13 +01002928 # incorrect hostname should raise an exception
2929 server = ThreadedEchoServer(context=server_context, chatty=True)
2930 with server:
2931 with context.wrap_socket(socket.socket(),
2932 server_hostname="python.example.org") as s:
2933 with self.assertRaises(ssl.CertificateError):
2934 s.connect((HOST, server.port))
2935
Christian Heimes529525f2018-05-23 22:24:45 +02002936 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002937 """Connecting when the server rejects the client's certificate
2938
2939 Launch a server with CERT_REQUIRED, and check that trying to
2940 connect to it with a wrong client certificate fails.
2941 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002942 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002943 # load client cert that is not signed by trusted CA
2944 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002945 # require TLS client authentication
2946 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002947 # TLS 1.3 has different handshake
2948 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002949
2950 server = ThreadedEchoServer(
2951 context=server_context, chatty=True, connectionchatty=True,
2952 )
2953
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002954 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002955 client_context.wrap_socket(socket.socket(),
2956 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002957 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002958 # Expect either an SSL error about the server rejecting
2959 # the connection, or a low-level connection reset (which
2960 # sometimes happens on Windows)
2961 s.connect((HOST, server.port))
2962 except ssl.SSLError as e:
2963 if support.verbose:
2964 sys.stdout.write("\nSSLError is %r\n" % e)
2965 except OSError as e:
2966 if e.errno != errno.ECONNRESET:
2967 raise
2968 if support.verbose:
2969 sys.stdout.write("\nsocket.error is %r\n" % e)
2970 else:
2971 self.fail("Use of invalid cert should have failed!")
2972
Christian Heimes529525f2018-05-23 22:24:45 +02002973 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2974 def test_wrong_cert_tls13(self):
2975 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002976 # load client cert that is not signed by trusted CA
2977 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002978 server_context.verify_mode = ssl.CERT_REQUIRED
2979 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2980 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2981
2982 server = ThreadedEchoServer(
2983 context=server_context, chatty=True, connectionchatty=True,
2984 )
2985 with server, \
2986 client_context.wrap_socket(socket.socket(),
2987 server_hostname=hostname) as s:
2988 # TLS 1.3 perform client cert exchange after handshake
2989 s.connect((HOST, server.port))
2990 try:
2991 s.write(b'data')
2992 s.read(4)
2993 except ssl.SSLError as e:
2994 if support.verbose:
2995 sys.stdout.write("\nSSLError is %r\n" % e)
2996 except OSError as e:
2997 if e.errno != errno.ECONNRESET:
2998 raise
2999 if support.verbose:
3000 sys.stdout.write("\nsocket.error is %r\n" % e)
3001 else:
3002 self.fail("Use of invalid cert should have failed!")
3003
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003004 def test_rude_shutdown(self):
3005 """A brutal shutdown of an SSL server should raise an OSError
3006 in the client when attempting handshake.
3007 """
3008 listener_ready = threading.Event()
3009 listener_gone = threading.Event()
3010
3011 s = socket.socket()
3012 port = support.bind_port(s, HOST)
3013
3014 # `listener` runs in a thread. It sits in an accept() until
3015 # the main thread connects. Then it rudely closes the socket,
3016 # and sets Event `listener_gone` to let the main thread know
3017 # the socket is gone.
3018 def listener():
3019 s.listen()
3020 listener_ready.set()
3021 newsock, addr = s.accept()
3022 newsock.close()
3023 s.close()
3024 listener_gone.set()
3025
3026 def connector():
3027 listener_ready.wait()
3028 with socket.socket() as c:
3029 c.connect((HOST, port))
3030 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003031 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003032 ssl_sock = test_wrap_socket(c)
3033 except OSError:
3034 pass
3035 else:
3036 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003037
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003038 t = threading.Thread(target=listener)
3039 t.start()
3040 try:
3041 connector()
3042 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003043 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003044
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003045 def test_ssl_cert_verify_error(self):
3046 if support.verbose:
3047 sys.stdout.write("\n")
3048
3049 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3050 server_context.load_cert_chain(SIGNED_CERTFILE)
3051
3052 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3053
3054 server = ThreadedEchoServer(context=server_context, chatty=True)
3055 with server:
3056 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003057 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003058 try:
3059 s.connect((HOST, server.port))
3060 except ssl.SSLError as e:
3061 msg = 'unable to get local issuer certificate'
3062 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3063 self.assertEqual(e.verify_code, 20)
3064 self.assertEqual(e.verify_message, msg)
3065 self.assertIn(msg, repr(e))
3066 self.assertIn('certificate verify failed', repr(e))
3067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003068 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3069 "OpenSSL is compiled without SSLv2 support")
3070 def test_protocol_sslv2(self):
3071 """Connecting to an SSLv2 server with various client options"""
3072 if support.verbose:
3073 sys.stdout.write("\n")
3074 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3075 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3076 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003077 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003078 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3079 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3080 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3081 # SSLv23 client with specific SSL options
3082 if no_sslv2_implies_sslv3_hello():
3083 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003084 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003086 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003087 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003088 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003090
Christian Heimesa170fa12017-09-15 20:27:30 +02003091 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003092 """Connecting to an SSLv23 server with various client options"""
3093 if support.verbose:
3094 sys.stdout.write("\n")
3095 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003096 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003097 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003098 except OSError as x:
3099 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3100 if support.verbose:
3101 sys.stdout.write(
3102 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3103 % str(x))
3104 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003105 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3106 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3107 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003108
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003109 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003110 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3111 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3112 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003113
3114 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003115 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3116 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3117 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003118
3119 # Server with specific SSL options
3120 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003121 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003122 server_options=ssl.OP_NO_SSLv3)
3123 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003124 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003125 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003126 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003127 server_options=ssl.OP_NO_TLSv1)
3128
3129
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003130 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3131 "OpenSSL is compiled without SSLv3 support")
3132 def test_protocol_sslv3(self):
3133 """Connecting to an SSLv3 server with various client options"""
3134 if support.verbose:
3135 sys.stdout.write("\n")
3136 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3137 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3138 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3139 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3140 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003141 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003142 client_options=ssl.OP_NO_SSLv3)
3143 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3144 if no_sslv2_implies_sslv3_hello():
3145 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003146 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003147 False, client_options=ssl.OP_NO_SSLv2)
3148
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003149 def test_protocol_tlsv1(self):
3150 """Connecting to a TLSv1 server with various client options"""
3151 if support.verbose:
3152 sys.stdout.write("\n")
3153 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3154 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3155 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3156 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3157 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3158 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3159 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003160 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003161 client_options=ssl.OP_NO_TLSv1)
3162
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003163 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3164 "TLS version 1.1 not supported.")
3165 def test_protocol_tlsv1_1(self):
3166 """Connecting to a TLSv1.1 server with various client options.
3167 Testing against older TLS versions."""
3168 if support.verbose:
3169 sys.stdout.write("\n")
3170 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3171 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3172 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3173 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3174 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003175 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003176 client_options=ssl.OP_NO_TLSv1_1)
3177
Christian Heimesa170fa12017-09-15 20:27:30 +02003178 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003179 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3180 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3181
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003182 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3183 "TLS version 1.2 not supported.")
3184 def test_protocol_tlsv1_2(self):
3185 """Connecting to a TLSv1.2 server with various client options.
3186 Testing against older TLS versions."""
3187 if support.verbose:
3188 sys.stdout.write("\n")
3189 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3190 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3191 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3192 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3193 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3194 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3195 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003196 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003197 client_options=ssl.OP_NO_TLSv1_2)
3198
Christian Heimesa170fa12017-09-15 20:27:30 +02003199 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003200 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3201 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3202 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3203 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3204
3205 def test_starttls(self):
3206 """Switching from clear text to encrypted and back again."""
3207 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3208
3209 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003210 starttls_server=True,
3211 chatty=True,
3212 connectionchatty=True)
3213 wrapped = False
3214 with server:
3215 s = socket.socket()
3216 s.setblocking(1)
3217 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003218 if support.verbose:
3219 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003220 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003221 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003222 sys.stdout.write(
3223 " client: sending %r...\n" % indata)
3224 if wrapped:
3225 conn.write(indata)
3226 outdata = conn.read()
3227 else:
3228 s.send(indata)
3229 outdata = s.recv(1024)
3230 msg = outdata.strip().lower()
3231 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3232 # STARTTLS ok, switch to secure mode
3233 if support.verbose:
3234 sys.stdout.write(
3235 " client: read %r from server, starting TLS...\n"
3236 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003237 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003238 wrapped = True
3239 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3240 # ENDTLS ok, switch back to clear text
3241 if support.verbose:
3242 sys.stdout.write(
3243 " client: read %r from server, ending TLS...\n"
3244 % msg)
3245 s = conn.unwrap()
3246 wrapped = False
3247 else:
3248 if support.verbose:
3249 sys.stdout.write(
3250 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003251 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003252 sys.stdout.write(" client: closing connection.\n")
3253 if wrapped:
3254 conn.write(b"over\n")
3255 else:
3256 s.send(b"over\n")
3257 if wrapped:
3258 conn.close()
3259 else:
3260 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003261
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003262 def test_socketserver(self):
3263 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003264 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003265 # try to connect
3266 if support.verbose:
3267 sys.stdout.write('\n')
3268 with open(CERTFILE, 'rb') as f:
3269 d1 = f.read()
3270 d2 = ''
3271 # now fetch the same data from the HTTPS server
3272 url = 'https://localhost:%d/%s' % (
3273 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003274 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003275 f = urllib.request.urlopen(url, context=context)
3276 try:
3277 dlen = f.info().get("content-length")
3278 if dlen and (int(dlen) > 0):
3279 d2 = f.read(int(dlen))
3280 if support.verbose:
3281 sys.stdout.write(
3282 " client: read %d bytes from remote server '%s'\n"
3283 % (len(d2), server))
3284 finally:
3285 f.close()
3286 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003287
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003288 def test_asyncore_server(self):
3289 """Check the example asyncore integration."""
3290 if support.verbose:
3291 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003292
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003293 indata = b"FOO\n"
3294 server = AsyncoreEchoServer(CERTFILE)
3295 with server:
3296 s = test_wrap_socket(socket.socket())
3297 s.connect(('127.0.0.1', server.port))
3298 if support.verbose:
3299 sys.stdout.write(
3300 " client: sending %r...\n" % indata)
3301 s.write(indata)
3302 outdata = s.read()
3303 if support.verbose:
3304 sys.stdout.write(" client: read %r\n" % outdata)
3305 if outdata != indata.lower():
3306 self.fail(
3307 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3308 % (outdata[:20], len(outdata),
3309 indata[:20].lower(), len(indata)))
3310 s.write(b"over\n")
3311 if support.verbose:
3312 sys.stdout.write(" client: closing connection.\n")
3313 s.close()
3314 if support.verbose:
3315 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003316
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003317 def test_recv_send(self):
3318 """Test recv(), send() and friends."""
3319 if support.verbose:
3320 sys.stdout.write("\n")
3321
3322 server = ThreadedEchoServer(CERTFILE,
3323 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003324 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003325 cacerts=CERTFILE,
3326 chatty=True,
3327 connectionchatty=False)
3328 with server:
3329 s = test_wrap_socket(socket.socket(),
3330 server_side=False,
3331 certfile=CERTFILE,
3332 ca_certs=CERTFILE,
3333 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003334 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003335 s.connect((HOST, server.port))
3336 # helper methods for standardising recv* method signatures
3337 def _recv_into():
3338 b = bytearray(b"\0"*100)
3339 count = s.recv_into(b)
3340 return b[:count]
3341
3342 def _recvfrom_into():
3343 b = bytearray(b"\0"*100)
3344 count, addr = s.recvfrom_into(b)
3345 return b[:count]
3346
3347 # (name, method, expect success?, *args, return value func)
3348 send_methods = [
3349 ('send', s.send, True, [], len),
3350 ('sendto', s.sendto, False, ["some.address"], len),
3351 ('sendall', s.sendall, True, [], lambda x: None),
3352 ]
3353 # (name, method, whether to expect success, *args)
3354 recv_methods = [
3355 ('recv', s.recv, True, []),
3356 ('recvfrom', s.recvfrom, False, ["some.address"]),
3357 ('recv_into', _recv_into, True, []),
3358 ('recvfrom_into', _recvfrom_into, False, []),
3359 ]
3360 data_prefix = "PREFIX_"
3361
3362 for (meth_name, send_meth, expect_success, args,
3363 ret_val_meth) in send_methods:
3364 indata = (data_prefix + meth_name).encode('ascii')
3365 try:
3366 ret = send_meth(indata, *args)
3367 msg = "sending with {}".format(meth_name)
3368 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3369 outdata = s.read()
3370 if outdata != indata.lower():
3371 self.fail(
3372 "While sending with <<{name:s}>> bad data "
3373 "<<{outdata:r}>> ({nout:d}) received; "
3374 "expected <<{indata:r}>> ({nin:d})\n".format(
3375 name=meth_name, outdata=outdata[:20],
3376 nout=len(outdata),
3377 indata=indata[:20], nin=len(indata)
3378 )
3379 )
3380 except ValueError as e:
3381 if expect_success:
3382 self.fail(
3383 "Failed to send with method <<{name:s}>>; "
3384 "expected to succeed.\n".format(name=meth_name)
3385 )
3386 if not str(e).startswith(meth_name):
3387 self.fail(
3388 "Method <<{name:s}>> failed with unexpected "
3389 "exception message: {exp:s}\n".format(
3390 name=meth_name, exp=e
3391 )
3392 )
3393
3394 for meth_name, recv_meth, expect_success, args in recv_methods:
3395 indata = (data_prefix + meth_name).encode('ascii')
3396 try:
3397 s.send(indata)
3398 outdata = recv_meth(*args)
3399 if outdata != indata.lower():
3400 self.fail(
3401 "While receiving with <<{name:s}>> bad data "
3402 "<<{outdata:r}>> ({nout:d}) received; "
3403 "expected <<{indata:r}>> ({nin:d})\n".format(
3404 name=meth_name, outdata=outdata[:20],
3405 nout=len(outdata),
3406 indata=indata[:20], nin=len(indata)
3407 )
3408 )
3409 except ValueError as e:
3410 if expect_success:
3411 self.fail(
3412 "Failed to receive with method <<{name:s}>>; "
3413 "expected to succeed.\n".format(name=meth_name)
3414 )
3415 if not str(e).startswith(meth_name):
3416 self.fail(
3417 "Method <<{name:s}>> failed with unexpected "
3418 "exception message: {exp:s}\n".format(
3419 name=meth_name, exp=e
3420 )
3421 )
3422 # consume data
3423 s.read()
3424
3425 # read(-1, buffer) is supported, even though read(-1) is not
3426 data = b"data"
3427 s.send(data)
3428 buffer = bytearray(len(data))
3429 self.assertEqual(s.read(-1, buffer), len(data))
3430 self.assertEqual(buffer, data)
3431
Christian Heimes888bbdc2017-09-07 14:18:21 -07003432 # sendall accepts bytes-like objects
3433 if ctypes is not None:
3434 ubyte = ctypes.c_ubyte * len(data)
3435 byteslike = ubyte.from_buffer_copy(data)
3436 s.sendall(byteslike)
3437 self.assertEqual(s.read(), data)
3438
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003439 # Make sure sendmsg et al are disallowed to avoid
3440 # inadvertent disclosure of data and/or corruption
3441 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003442 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003443 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3444 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3445 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003446 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003447 s.write(b"over\n")
3448
3449 self.assertRaises(ValueError, s.recv, -1)
3450 self.assertRaises(ValueError, s.read, -1)
3451
3452 s.close()
3453
3454 def test_recv_zero(self):
3455 server = ThreadedEchoServer(CERTFILE)
3456 server.__enter__()
3457 self.addCleanup(server.__exit__, None, None)
3458 s = socket.create_connection((HOST, server.port))
3459 self.addCleanup(s.close)
3460 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3461 self.addCleanup(s.close)
3462
3463 # recv/read(0) should return no data
3464 s.send(b"data")
3465 self.assertEqual(s.recv(0), b"")
3466 self.assertEqual(s.read(0), b"")
3467 self.assertEqual(s.read(), b"data")
3468
3469 # Should not block if the other end sends no data
3470 s.setblocking(False)
3471 self.assertEqual(s.recv(0), b"")
3472 self.assertEqual(s.recv_into(bytearray()), 0)
3473
3474 def test_nonblocking_send(self):
3475 server = ThreadedEchoServer(CERTFILE,
3476 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003477 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003478 cacerts=CERTFILE,
3479 chatty=True,
3480 connectionchatty=False)
3481 with server:
3482 s = test_wrap_socket(socket.socket(),
3483 server_side=False,
3484 certfile=CERTFILE,
3485 ca_certs=CERTFILE,
3486 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003487 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003488 s.connect((HOST, server.port))
3489 s.setblocking(False)
3490
3491 # If we keep sending data, at some point the buffers
3492 # will be full and the call will block
3493 buf = bytearray(8192)
3494 def fill_buffer():
3495 while True:
3496 s.send(buf)
3497 self.assertRaises((ssl.SSLWantWriteError,
3498 ssl.SSLWantReadError), fill_buffer)
3499
3500 # Now read all the output and discard it
3501 s.setblocking(True)
3502 s.close()
3503
3504 def test_handshake_timeout(self):
3505 # Issue #5103: SSL handshake must respect the socket timeout
3506 server = socket.socket(socket.AF_INET)
3507 host = "127.0.0.1"
3508 port = support.bind_port(server)
3509 started = threading.Event()
3510 finish = False
3511
3512 def serve():
3513 server.listen()
3514 started.set()
3515 conns = []
3516 while not finish:
3517 r, w, e = select.select([server], [], [], 0.1)
3518 if server in r:
3519 # Let the socket hang around rather than having
3520 # it closed by garbage collection.
3521 conns.append(server.accept()[0])
3522 for sock in conns:
3523 sock.close()
3524
3525 t = threading.Thread(target=serve)
3526 t.start()
3527 started.wait()
3528
3529 try:
3530 try:
3531 c = socket.socket(socket.AF_INET)
3532 c.settimeout(0.2)
3533 c.connect((host, port))
3534 # Will attempt handshake and time out
3535 self.assertRaisesRegex(socket.timeout, "timed out",
3536 test_wrap_socket, c)
3537 finally:
3538 c.close()
3539 try:
3540 c = socket.socket(socket.AF_INET)
3541 c = test_wrap_socket(c)
3542 c.settimeout(0.2)
3543 # Will attempt handshake and time out
3544 self.assertRaisesRegex(socket.timeout, "timed out",
3545 c.connect, (host, port))
3546 finally:
3547 c.close()
3548 finally:
3549 finish = True
3550 t.join()
3551 server.close()
3552
3553 def test_server_accept(self):
3554 # Issue #16357: accept() on a SSLSocket created through
3555 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003556 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003557 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003558 context.load_verify_locations(SIGNING_CA)
3559 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003560 server = socket.socket(socket.AF_INET)
3561 host = "127.0.0.1"
3562 port = support.bind_port(server)
3563 server = context.wrap_socket(server, server_side=True)
3564 self.assertTrue(server.server_side)
3565
3566 evt = threading.Event()
3567 remote = None
3568 peer = None
3569 def serve():
3570 nonlocal remote, peer
3571 server.listen()
3572 # Block on the accept and wait on the connection to close.
3573 evt.set()
3574 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003575 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003576
3577 t = threading.Thread(target=serve)
3578 t.start()
3579 # Client wait until server setup and perform a connect.
3580 evt.wait()
3581 client = context.wrap_socket(socket.socket())
3582 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003583 client.send(b'data')
3584 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003585 client_addr = client.getsockname()
3586 client.close()
3587 t.join()
3588 remote.close()
3589 server.close()
3590 # Sanity checks.
3591 self.assertIsInstance(remote, ssl.SSLSocket)
3592 self.assertEqual(peer, client_addr)
3593
3594 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003595 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003596 with context.wrap_socket(socket.socket()) as sock:
3597 with self.assertRaises(OSError) as cm:
3598 sock.getpeercert()
3599 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3600
3601 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003602 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003603 with context.wrap_socket(socket.socket()) as sock:
3604 with self.assertRaises(OSError) as cm:
3605 sock.do_handshake()
3606 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3607
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003608 def test_no_shared_ciphers(self):
3609 client_context, server_context, hostname = testing_context()
3610 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3611 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003612 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003613 client_context.set_ciphers("AES128")
3614 server_context.set_ciphers("AES256")
3615 with ThreadedEchoServer(context=server_context) as server:
3616 with client_context.wrap_socket(socket.socket(),
3617 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003618 with self.assertRaises(OSError):
3619 s.connect((HOST, server.port))
3620 self.assertIn("no shared cipher", server.conn_errors[0])
3621
3622 def test_version_basic(self):
3623 """
3624 Basic tests for SSLSocket.version().
3625 More tests are done in the test_protocol_*() methods.
3626 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003627 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3628 context.check_hostname = False
3629 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003630 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003631 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003632 chatty=False) as server:
3633 with context.wrap_socket(socket.socket()) as s:
3634 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003635 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003636 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003637 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003638 self.assertEqual(s.version(), 'TLSv1.3')
3639 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003640 self.assertEqual(s.version(), 'TLSv1.2')
3641 else: # 0.9.8 to 1.0.1
3642 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003643 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003644 self.assertIs(s.version(), None)
3645
Christian Heimescb5b68a2017-09-07 18:07:00 -07003646 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3647 "test requires TLSv1.3 enabled OpenSSL")
3648 def test_tls1_3(self):
3649 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3650 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003651 context.options |= (
3652 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3653 )
3654 with ThreadedEchoServer(context=context) as server:
3655 with context.wrap_socket(socket.socket()) as s:
3656 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003657 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003658 'TLS_AES_256_GCM_SHA384',
3659 'TLS_CHACHA20_POLY1305_SHA256',
3660 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003661 })
3662 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003663
Christian Heimes698dde12018-02-27 11:54:43 +01003664 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3665 "required OpenSSL 1.1.0g")
3666 def test_min_max_version(self):
3667 client_context, server_context, hostname = testing_context()
3668 # client TLSv1.0 to 1.2
3669 client_context.minimum_version = ssl.TLSVersion.TLSv1
3670 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3671 # server only TLSv1.2
3672 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3673 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3674
3675 with ThreadedEchoServer(context=server_context) as server:
3676 with client_context.wrap_socket(socket.socket(),
3677 server_hostname=hostname) as s:
3678 s.connect((HOST, server.port))
3679 self.assertEqual(s.version(), 'TLSv1.2')
3680
3681 # client 1.0 to 1.2, server 1.0 to 1.1
3682 server_context.minimum_version = ssl.TLSVersion.TLSv1
3683 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3684
3685 with ThreadedEchoServer(context=server_context) as server:
3686 with client_context.wrap_socket(socket.socket(),
3687 server_hostname=hostname) as s:
3688 s.connect((HOST, server.port))
3689 self.assertEqual(s.version(), 'TLSv1.1')
3690
3691 # client 1.0, server 1.2 (mismatch)
3692 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3693 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3694 client_context.minimum_version = ssl.TLSVersion.TLSv1
3695 client_context.maximum_version = ssl.TLSVersion.TLSv1
3696 with ThreadedEchoServer(context=server_context) as server:
3697 with client_context.wrap_socket(socket.socket(),
3698 server_hostname=hostname) as s:
3699 with self.assertRaises(ssl.SSLError) as e:
3700 s.connect((HOST, server.port))
3701 self.assertIn("alert", str(e.exception))
3702
3703
3704 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3705 "required OpenSSL 1.1.0g")
3706 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3707 def test_min_max_version_sslv3(self):
3708 client_context, server_context, hostname = testing_context()
3709 server_context.minimum_version = ssl.TLSVersion.SSLv3
3710 client_context.minimum_version = ssl.TLSVersion.SSLv3
3711 client_context.maximum_version = ssl.TLSVersion.SSLv3
3712 with ThreadedEchoServer(context=server_context) as server:
3713 with client_context.wrap_socket(socket.socket(),
3714 server_hostname=hostname) as s:
3715 s.connect((HOST, server.port))
3716 self.assertEqual(s.version(), 'SSLv3')
3717
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003718 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3719 def test_default_ecdh_curve(self):
3720 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3721 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003722 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003723 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003724 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3725 # cipher name.
3726 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003727 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3728 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3729 # our default cipher list should prefer ECDH-based ciphers
3730 # automatically.
3731 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3732 context.set_ciphers("ECCdraft:ECDH")
3733 with ThreadedEchoServer(context=context) as server:
3734 with context.wrap_socket(socket.socket()) as s:
3735 s.connect((HOST, server.port))
3736 self.assertIn("ECDH", s.cipher()[0])
3737
3738 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3739 "'tls-unique' channel binding not available")
3740 def test_tls_unique_channel_binding(self):
3741 """Test tls-unique channel binding."""
3742 if support.verbose:
3743 sys.stdout.write("\n")
3744
Christian Heimes05d9fe32018-02-27 08:55:39 +01003745 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003746
3747 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003748 chatty=True,
3749 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003750
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003751 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003752 with client_context.wrap_socket(
3753 socket.socket(),
3754 server_hostname=hostname) as s:
3755 s.connect((HOST, server.port))
3756 # get the data
3757 cb_data = s.get_channel_binding("tls-unique")
3758 if support.verbose:
3759 sys.stdout.write(
3760 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761
Christian Heimes05d9fe32018-02-27 08:55:39 +01003762 # check if it is sane
3763 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003764 if s.version() == 'TLSv1.3':
3765 self.assertEqual(len(cb_data), 48)
3766 else:
3767 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003768
Christian Heimes05d9fe32018-02-27 08:55:39 +01003769 # and compare with the peers version
3770 s.write(b"CB tls-unique\n")
3771 peer_data_repr = s.read().strip()
3772 self.assertEqual(peer_data_repr,
3773 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003774
3775 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003776 with client_context.wrap_socket(
3777 socket.socket(),
3778 server_hostname=hostname) as s:
3779 s.connect((HOST, server.port))
3780 new_cb_data = s.get_channel_binding("tls-unique")
3781 if support.verbose:
3782 sys.stdout.write(
3783 "got another channel binding data: {0!r}\n".format(
3784 new_cb_data)
3785 )
3786 # is it really unique
3787 self.assertNotEqual(cb_data, new_cb_data)
3788 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003789 if s.version() == 'TLSv1.3':
3790 self.assertEqual(len(cb_data), 48)
3791 else:
3792 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003793 s.write(b"CB tls-unique\n")
3794 peer_data_repr = s.read().strip()
3795 self.assertEqual(peer_data_repr,
3796 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003797
3798 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003799 client_context, server_context, hostname = testing_context()
3800 stats = server_params_test(client_context, server_context,
3801 chatty=True, connectionchatty=True,
3802 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003803 if support.verbose:
3804 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3805 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3806
3807 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3808 "ssl.OP_NO_COMPRESSION needed for this test")
3809 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003810 client_context, server_context, hostname = testing_context()
3811 client_context.options |= ssl.OP_NO_COMPRESSION
3812 server_context.options |= ssl.OP_NO_COMPRESSION
3813 stats = server_params_test(client_context, server_context,
3814 chatty=True, connectionchatty=True,
3815 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003816 self.assertIs(stats['compression'], None)
3817
3818 def test_dh_params(self):
3819 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003820 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003821 # test scenario needs TLS <= 1.2
3822 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003823 server_context.load_dh_params(DHFILE)
3824 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003825 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003826 stats = server_params_test(client_context, server_context,
3827 chatty=True, connectionchatty=True,
3828 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003829 cipher = stats["cipher"][0]
3830 parts = cipher.split("-")
3831 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3832 self.fail("Non-DH cipher: " + cipher[0])
3833
Christian Heimesb7b92252018-02-25 09:49:31 +01003834 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003835 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003836 def test_ecdh_curve(self):
3837 # server secp384r1, client auto
3838 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003839
Christian Heimesb7b92252018-02-25 09:49:31 +01003840 server_context.set_ecdh_curve("secp384r1")
3841 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3842 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3843 stats = server_params_test(client_context, server_context,
3844 chatty=True, connectionchatty=True,
3845 sni_name=hostname)
3846
3847 # server auto, client secp384r1
3848 client_context, server_context, hostname = testing_context()
3849 client_context.set_ecdh_curve("secp384r1")
3850 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3851 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3852 stats = server_params_test(client_context, server_context,
3853 chatty=True, connectionchatty=True,
3854 sni_name=hostname)
3855
3856 # server / client curve mismatch
3857 client_context, server_context, hostname = testing_context()
3858 client_context.set_ecdh_curve("prime256v1")
3859 server_context.set_ecdh_curve("secp384r1")
3860 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3861 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3862 try:
3863 stats = server_params_test(client_context, server_context,
3864 chatty=True, connectionchatty=True,
3865 sni_name=hostname)
3866 except ssl.SSLError:
3867 pass
3868 else:
3869 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003870 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003871 self.fail("mismatch curve did not fail")
3872
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003873 def test_selected_alpn_protocol(self):
3874 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003875 client_context, server_context, hostname = testing_context()
3876 stats = server_params_test(client_context, server_context,
3877 chatty=True, connectionchatty=True,
3878 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003879 self.assertIs(stats['client_alpn_protocol'], None)
3880
3881 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3882 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3883 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003884 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003885 server_context.set_alpn_protocols(['foo', 'bar'])
3886 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003887 chatty=True, connectionchatty=True,
3888 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003889 self.assertIs(stats['client_alpn_protocol'], None)
3890
3891 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3892 def test_alpn_protocols(self):
3893 server_protocols = ['foo', 'bar', 'milkshake']
3894 protocol_tests = [
3895 (['foo', 'bar'], 'foo'),
3896 (['bar', 'foo'], 'foo'),
3897 (['milkshake'], 'milkshake'),
3898 (['http/3.0', 'http/4.0'], None)
3899 ]
3900 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003901 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003902 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003903 client_context.set_alpn_protocols(client_protocols)
3904
3905 try:
3906 stats = server_params_test(client_context,
3907 server_context,
3908 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003909 connectionchatty=True,
3910 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003911 except ssl.SSLError as e:
3912 stats = e
3913
Christian Heimes05d9fe32018-02-27 08:55:39 +01003914 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003915 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3916 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3917 self.assertIsInstance(stats, ssl.SSLError)
3918 else:
3919 msg = "failed trying %s (s) and %s (c).\n" \
3920 "was expecting %s, but got %%s from the %%s" \
3921 % (str(server_protocols), str(client_protocols),
3922 str(expected))
3923 client_result = stats['client_alpn_protocol']
3924 self.assertEqual(client_result, expected,
3925 msg % (client_result, "client"))
3926 server_result = stats['server_alpn_protocols'][-1] \
3927 if len(stats['server_alpn_protocols']) else 'nothing'
3928 self.assertEqual(server_result, expected,
3929 msg % (server_result, "server"))
3930
3931 def test_selected_npn_protocol(self):
3932 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003933 client_context, server_context, hostname = testing_context()
3934 stats = server_params_test(client_context, server_context,
3935 chatty=True, connectionchatty=True,
3936 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003937 self.assertIs(stats['client_npn_protocol'], None)
3938
3939 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3940 def test_npn_protocols(self):
3941 server_protocols = ['http/1.1', 'spdy/2']
3942 protocol_tests = [
3943 (['http/1.1', 'spdy/2'], 'http/1.1'),
3944 (['spdy/2', 'http/1.1'], 'http/1.1'),
3945 (['spdy/2', 'test'], 'spdy/2'),
3946 (['abc', 'def'], 'abc')
3947 ]
3948 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003949 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003950 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003951 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003952 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003953 chatty=True, connectionchatty=True,
3954 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003955 msg = "failed trying %s (s) and %s (c).\n" \
3956 "was expecting %s, but got %%s from the %%s" \
3957 % (str(server_protocols), str(client_protocols),
3958 str(expected))
3959 client_result = stats['client_npn_protocol']
3960 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3961 server_result = stats['server_npn_protocols'][-1] \
3962 if len(stats['server_npn_protocols']) else 'nothing'
3963 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3964
3965 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003966 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003967 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003968 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003969 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003970 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003971 client_context.load_verify_locations(SIGNING_CA)
3972 return server_context, other_context, client_context
3973
3974 def check_common_name(self, stats, name):
3975 cert = stats['peercert']
3976 self.assertIn((('commonName', name),), cert['subject'])
3977
3978 @needs_sni
3979 def test_sni_callback(self):
3980 calls = []
3981 server_context, other_context, client_context = self.sni_contexts()
3982
Christian Heimesa170fa12017-09-15 20:27:30 +02003983 client_context.check_hostname = False
3984
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003985 def servername_cb(ssl_sock, server_name, initial_context):
3986 calls.append((server_name, initial_context))
3987 if server_name is not None:
3988 ssl_sock.context = other_context
3989 server_context.set_servername_callback(servername_cb)
3990
3991 stats = server_params_test(client_context, server_context,
3992 chatty=True,
3993 sni_name='supermessage')
3994 # The hostname was fetched properly, and the certificate was
3995 # changed for the connection.
3996 self.assertEqual(calls, [("supermessage", server_context)])
3997 # CERTFILE4 was selected
3998 self.check_common_name(stats, 'fakehostname')
3999
4000 calls = []
4001 # The callback is called with server_name=None
4002 stats = server_params_test(client_context, server_context,
4003 chatty=True,
4004 sni_name=None)
4005 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02004006 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004007
4008 # Check disabling the callback
4009 calls = []
4010 server_context.set_servername_callback(None)
4011
4012 stats = server_params_test(client_context, server_context,
4013 chatty=True,
4014 sni_name='notfunny')
4015 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02004016 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017 self.assertEqual(calls, [])
4018
4019 @needs_sni
4020 def test_sni_callback_alert(self):
4021 # Returning a TLS alert is reflected to the connecting client
4022 server_context, other_context, client_context = self.sni_contexts()
4023
4024 def cb_returning_alert(ssl_sock, server_name, initial_context):
4025 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
4026 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004027 with self.assertRaises(ssl.SSLError) as cm:
4028 stats = server_params_test(client_context, server_context,
4029 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004030 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004031 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004033 @needs_sni
4034 def test_sni_callback_raising(self):
4035 # Raising fails the connection with a TLS handshake failure alert.
4036 server_context, other_context, client_context = self.sni_contexts()
4037
4038 def cb_raising(ssl_sock, server_name, initial_context):
4039 1/0
4040 server_context.set_servername_callback(cb_raising)
4041
4042 with self.assertRaises(ssl.SSLError) as cm, \
4043 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004044 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004045 chatty=False,
4046 sni_name='supermessage')
4047 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4048 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004049
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004050 @needs_sni
4051 def test_sni_callback_wrong_return_type(self):
4052 # Returning the wrong return type terminates the TLS connection
4053 # with an internal error alert.
4054 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004055
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004056 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4057 return "foo"
4058 server_context.set_servername_callback(cb_wrong_return_type)
4059
4060 with self.assertRaises(ssl.SSLError) as cm, \
4061 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004062 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 chatty=False,
4064 sni_name='supermessage')
4065 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4066 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004068 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004069 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004070 client_context.set_ciphers("AES128:AES256")
4071 server_context.set_ciphers("AES256")
4072 expected_algs = [
4073 "AES256", "AES-256",
4074 # TLS 1.3 ciphers are always enabled
4075 "TLS_CHACHA20", "TLS_AES",
4076 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004077
Christian Heimesa170fa12017-09-15 20:27:30 +02004078 stats = server_params_test(client_context, server_context,
4079 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004080 ciphers = stats['server_shared_ciphers'][0]
4081 self.assertGreater(len(ciphers), 0)
4082 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004083 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004084 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004086 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004087 client_context, server_context, hostname = testing_context()
4088 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004089
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004090 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004091 s = client_context.wrap_socket(socket.socket(),
4092 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 s.connect((HOST, server.port))
4094 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 self.assertRaises(ValueError, s.read, 1024)
4097 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004098
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004099 def test_sendfile(self):
4100 TEST_DATA = b"x" * 512
4101 with open(support.TESTFN, 'wb') as f:
4102 f.write(TEST_DATA)
4103 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004104 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004105 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004106 context.load_verify_locations(SIGNING_CA)
4107 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004108 server = ThreadedEchoServer(context=context, chatty=False)
4109 with server:
4110 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004111 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004112 with open(support.TESTFN, 'rb') as file:
4113 s.sendfile(file)
4114 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004115
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004116 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004117 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004118 # TODO: sessions aren't compatible with TLSv1.3 yet
4119 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004120
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004121 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004122 stats = server_params_test(client_context, server_context,
4123 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004124 session = stats['session']
4125 self.assertTrue(session.id)
4126 self.assertGreater(session.time, 0)
4127 self.assertGreater(session.timeout, 0)
4128 self.assertTrue(session.has_ticket)
4129 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4130 self.assertGreater(session.ticket_lifetime_hint, 0)
4131 self.assertFalse(stats['session_reused'])
4132 sess_stat = server_context.session_stats()
4133 self.assertEqual(sess_stat['accept'], 1)
4134 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004135
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004136 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004137 stats = server_params_test(client_context, server_context,
4138 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004139 sess_stat = server_context.session_stats()
4140 self.assertEqual(sess_stat['accept'], 2)
4141 self.assertEqual(sess_stat['hits'], 1)
4142 self.assertTrue(stats['session_reused'])
4143 session2 = stats['session']
4144 self.assertEqual(session2.id, session.id)
4145 self.assertEqual(session2, session)
4146 self.assertIsNot(session2, session)
4147 self.assertGreaterEqual(session2.time, session.time)
4148 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004149
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004150 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004151 stats = server_params_test(client_context, server_context,
4152 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004153 self.assertFalse(stats['session_reused'])
4154 session3 = stats['session']
4155 self.assertNotEqual(session3.id, session.id)
4156 self.assertNotEqual(session3, session)
4157 sess_stat = server_context.session_stats()
4158 self.assertEqual(sess_stat['accept'], 3)
4159 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004160
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004161 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004162 stats = server_params_test(client_context, server_context,
4163 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004164 self.assertTrue(stats['session_reused'])
4165 session4 = stats['session']
4166 self.assertEqual(session4.id, session.id)
4167 self.assertEqual(session4, session)
4168 self.assertGreaterEqual(session4.time, session.time)
4169 self.assertGreaterEqual(session4.timeout, session.timeout)
4170 sess_stat = server_context.session_stats()
4171 self.assertEqual(sess_stat['accept'], 4)
4172 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004173
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004174 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004175 client_context, server_context, hostname = testing_context()
4176 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004177
Christian Heimes05d9fe32018-02-27 08:55:39 +01004178 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004179 client_context.options |= ssl.OP_NO_TLSv1_3
4180 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004181
Christian Heimesa170fa12017-09-15 20:27:30 +02004182 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004183 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004184 with client_context.wrap_socket(socket.socket(),
4185 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004186 # session is None before handshake
4187 self.assertEqual(s.session, None)
4188 self.assertEqual(s.session_reused, None)
4189 s.connect((HOST, server.port))
4190 session = s.session
4191 self.assertTrue(session)
4192 with self.assertRaises(TypeError) as e:
4193 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004194 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004195
Christian Heimesa170fa12017-09-15 20:27:30 +02004196 with client_context.wrap_socket(socket.socket(),
4197 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004198 s.connect((HOST, server.port))
4199 # cannot set session after handshake
4200 with self.assertRaises(ValueError) as e:
4201 s.session = session
4202 self.assertEqual(str(e.exception),
4203 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004204
Christian Heimesa170fa12017-09-15 20:27:30 +02004205 with client_context.wrap_socket(socket.socket(),
4206 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004207 # can set session before handshake and before the
4208 # connection was established
4209 s.session = session
4210 s.connect((HOST, server.port))
4211 self.assertEqual(s.session.id, session.id)
4212 self.assertEqual(s.session, session)
4213 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004214
Christian Heimesa170fa12017-09-15 20:27:30 +02004215 with client_context2.wrap_socket(socket.socket(),
4216 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004217 # cannot re-use session with a different SSLContext
4218 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004219 s.session = session
4220 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004221 self.assertEqual(str(e.exception),
4222 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004223
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004224
Christian Heimes9fb051f2018-09-23 08:32:31 +02004225@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4226class TestPostHandshakeAuth(unittest.TestCase):
4227 def test_pha_setter(self):
4228 protocols = [
4229 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4230 ]
4231 for protocol in protocols:
4232 ctx = ssl.SSLContext(protocol)
4233 self.assertEqual(ctx.post_handshake_auth, False)
4234
4235 ctx.post_handshake_auth = True
4236 self.assertEqual(ctx.post_handshake_auth, True)
4237
4238 ctx.verify_mode = ssl.CERT_REQUIRED
4239 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4240 self.assertEqual(ctx.post_handshake_auth, True)
4241
4242 ctx.post_handshake_auth = False
4243 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4244 self.assertEqual(ctx.post_handshake_auth, False)
4245
4246 ctx.verify_mode = ssl.CERT_OPTIONAL
4247 ctx.post_handshake_auth = True
4248 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4249 self.assertEqual(ctx.post_handshake_auth, True)
4250
4251 def test_pha_required(self):
4252 client_context, server_context, hostname = testing_context()
4253 server_context.post_handshake_auth = True
4254 server_context.verify_mode = ssl.CERT_REQUIRED
4255 client_context.post_handshake_auth = True
4256 client_context.load_cert_chain(SIGNED_CERTFILE)
4257
4258 server = ThreadedEchoServer(context=server_context, chatty=False)
4259 with server:
4260 with client_context.wrap_socket(socket.socket(),
4261 server_hostname=hostname) as s:
4262 s.connect((HOST, server.port))
4263 s.write(b'HASCERT')
4264 self.assertEqual(s.recv(1024), b'FALSE\n')
4265 s.write(b'PHA')
4266 self.assertEqual(s.recv(1024), b'OK\n')
4267 s.write(b'HASCERT')
4268 self.assertEqual(s.recv(1024), b'TRUE\n')
4269 # PHA method just returns true when cert is already available
4270 s.write(b'PHA')
4271 self.assertEqual(s.recv(1024), b'OK\n')
4272 s.write(b'GETCERT')
4273 cert_text = s.recv(4096).decode('us-ascii')
4274 self.assertIn('Python Software Foundation CA', cert_text)
4275
4276 def test_pha_required_nocert(self):
4277 client_context, server_context, hostname = testing_context()
4278 server_context.post_handshake_auth = True
4279 server_context.verify_mode = ssl.CERT_REQUIRED
4280 client_context.post_handshake_auth = True
4281
4282 server = ThreadedEchoServer(context=server_context, chatty=False)
4283 with server:
4284 with client_context.wrap_socket(socket.socket(),
4285 server_hostname=hostname) as s:
4286 s.connect((HOST, server.port))
4287 s.write(b'PHA')
4288 # receive CertificateRequest
4289 self.assertEqual(s.recv(1024), b'OK\n')
4290 # send empty Certificate + Finish
4291 s.write(b'HASCERT')
4292 # receive alert
4293 with self.assertRaisesRegex(
4294 ssl.SSLError,
4295 'tlsv13 alert certificate required'):
4296 s.recv(1024)
4297
4298 def test_pha_optional(self):
4299 if support.verbose:
4300 sys.stdout.write("\n")
4301
4302 client_context, server_context, hostname = testing_context()
4303 server_context.post_handshake_auth = True
4304 server_context.verify_mode = ssl.CERT_REQUIRED
4305 client_context.post_handshake_auth = True
4306 client_context.load_cert_chain(SIGNED_CERTFILE)
4307
4308 # check CERT_OPTIONAL
4309 server_context.verify_mode = ssl.CERT_OPTIONAL
4310 server = ThreadedEchoServer(context=server_context, chatty=False)
4311 with server:
4312 with client_context.wrap_socket(socket.socket(),
4313 server_hostname=hostname) as s:
4314 s.connect((HOST, server.port))
4315 s.write(b'HASCERT')
4316 self.assertEqual(s.recv(1024), b'FALSE\n')
4317 s.write(b'PHA')
4318 self.assertEqual(s.recv(1024), b'OK\n')
4319 s.write(b'HASCERT')
4320 self.assertEqual(s.recv(1024), b'TRUE\n')
4321
4322 def test_pha_optional_nocert(self):
4323 if support.verbose:
4324 sys.stdout.write("\n")
4325
4326 client_context, server_context, hostname = testing_context()
4327 server_context.post_handshake_auth = True
4328 server_context.verify_mode = ssl.CERT_OPTIONAL
4329 client_context.post_handshake_auth = True
4330
4331 server = ThreadedEchoServer(context=server_context, chatty=False)
4332 with server:
4333 with client_context.wrap_socket(socket.socket(),
4334 server_hostname=hostname) as s:
4335 s.connect((HOST, server.port))
4336 s.write(b'HASCERT')
4337 self.assertEqual(s.recv(1024), b'FALSE\n')
4338 s.write(b'PHA')
4339 self.assertEqual(s.recv(1024), b'OK\n')
4340 # optional doens't fail when client does not have a cert
4341 s.write(b'HASCERT')
4342 self.assertEqual(s.recv(1024), b'FALSE\n')
4343
4344 def test_pha_no_pha_client(self):
4345 client_context, server_context, hostname = testing_context()
4346 server_context.post_handshake_auth = True
4347 server_context.verify_mode = ssl.CERT_REQUIRED
4348 client_context.load_cert_chain(SIGNED_CERTFILE)
4349
4350 server = ThreadedEchoServer(context=server_context, chatty=False)
4351 with server:
4352 with client_context.wrap_socket(socket.socket(),
4353 server_hostname=hostname) as s:
4354 s.connect((HOST, server.port))
4355 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4356 s.verify_client_post_handshake()
4357 s.write(b'PHA')
4358 self.assertIn(b'extension not received', s.recv(1024))
4359
4360 def test_pha_no_pha_server(self):
4361 # server doesn't have PHA enabled, cert is requested in handshake
4362 client_context, server_context, hostname = testing_context()
4363 server_context.verify_mode = ssl.CERT_REQUIRED
4364 client_context.post_handshake_auth = True
4365 client_context.load_cert_chain(SIGNED_CERTFILE)
4366
4367 server = ThreadedEchoServer(context=server_context, chatty=False)
4368 with server:
4369 with client_context.wrap_socket(socket.socket(),
4370 server_hostname=hostname) as s:
4371 s.connect((HOST, server.port))
4372 s.write(b'HASCERT')
4373 self.assertEqual(s.recv(1024), b'TRUE\n')
4374 # PHA doesn't fail if there is already a cert
4375 s.write(b'PHA')
4376 self.assertEqual(s.recv(1024), b'OK\n')
4377 s.write(b'HASCERT')
4378 self.assertEqual(s.recv(1024), b'TRUE\n')
4379
4380 def test_pha_not_tls13(self):
4381 # TLS 1.2
4382 client_context, server_context, hostname = testing_context()
4383 server_context.verify_mode = ssl.CERT_REQUIRED
4384 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4385 client_context.post_handshake_auth = True
4386 client_context.load_cert_chain(SIGNED_CERTFILE)
4387
4388 server = ThreadedEchoServer(context=server_context, chatty=False)
4389 with server:
4390 with client_context.wrap_socket(socket.socket(),
4391 server_hostname=hostname) as s:
4392 s.connect((HOST, server.port))
4393 # PHA fails for TLS != 1.3
4394 s.write(b'PHA')
4395 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4396
4397
Thomas Woutersed03b412007-08-28 21:37:11 +00004398def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004399 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004400 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004401 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004402 'Mac': platform.mac_ver,
4403 'Windows': platform.win32_ver,
4404 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004405 for name, func in plats.items():
4406 plat = func()
4407 if plat and plat[0]:
4408 plat = '%s %r' % (name, plat)
4409 break
4410 else:
4411 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004412 print("test_ssl: testing with %r %r" %
4413 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4414 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004415 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004416 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4417 try:
4418 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4419 except AttributeError:
4420 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004421
Antoine Pitrou152efa22010-05-16 18:19:27 +00004422 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004423 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004424 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004425 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004426 BADCERT, BADKEY, EMPTYCERT]:
4427 if not os.path.exists(filename):
4428 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004429
Martin Panter3840b2a2016-03-27 01:53:46 +00004430 tests = [
4431 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004432 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes9fb051f2018-09-23 08:32:31 +02004433 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004434 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004435
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004436 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004437 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004438
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004439 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004440 try:
4441 support.run_unittest(*tests)
4442 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004443 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004444
4445if __name__ == "__main__":
4446 test_main()