blob: 9e571cc78e4b07036dd873939c8c96f8cadbc902 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010020import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070021try:
22 import ctypes
23except ImportError:
24 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000025
Antoine Pitrou05d936d2010-10-13 11:38:36 +000026ssl = support.import_module("ssl")
27
Martin Panter3840b2a2016-03-27 01:53:46 +000028
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010029PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000030HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020031IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010032IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
33IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Christian Heimesefff7062013-11-21 03:35:02 +010036def data_file(*name):
37 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Antoine Pitrou81564092010-10-08 23:06:24 +000039# The custom key and certificate files used in test_ssl are generated
40# using Lib/test/make_ssl_certs.py.
41# Other certificates are simply fetched from the Internet servers they
42# are meant to authenticate.
43
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000046ONLYCERT = data_file("ssl_cert.pem")
47ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000048BYTES_ONLYCERT = os.fsencode(ONLYCERT)
49BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020050CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000053CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000054BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010055CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56CAFILE_CACERT = data_file("capath", "5ed36f99.0")
57
Christian Heimesbd5c7d22018-01-20 15:16:30 +010058CERTFILE_INFO = {
59 'issuer': ((('countryName', 'XY'),),
60 (('localityName', 'Castle Anthrax'),),
61 (('organizationName', 'Python Software Foundation'),),
62 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020063 'notAfter': 'Aug 26 14:23:15 2028 GMT',
64 'notBefore': 'Aug 29 14:23:15 2018 GMT',
65 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010066 'subject': ((('countryName', 'XY'),),
67 (('localityName', 'Castle Anthrax'),),
68 (('organizationName', 'Python Software Foundation'),),
69 (('commonName', 'localhost'),)),
70 'subjectAltName': (('DNS', 'localhost'),),
71 'version': 3
72}
Antoine Pitrou152efa22010-05-16 18:19:27 +000073
Christian Heimes22587792013-11-21 23:56:13 +010074# empty CRL
75CRLFILE = data_file("revocation.crl")
76
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010077# Two keys and certs signed by the same CA (for SNI tests)
78SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020079SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010080
81SIGNED_CERTFILE_INFO = {
82 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
83 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
84 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
85 'issuer': ((('countryName', 'XY'),),
86 (('organizationName', 'Python Software Foundation CA'),),
87 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020088 'notAfter': 'Jul 7 14:23:16 2028 GMT',
89 'notBefore': 'Aug 29 14:23:16 2018 GMT',
90 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010091 'subject': ((('countryName', 'XY'),),
92 (('localityName', 'Castle Anthrax'),),
93 (('organizationName', 'Python Software Foundation'),),
94 (('commonName', 'localhost'),)),
95 'subjectAltName': (('DNS', 'localhost'),),
96 'version': 3
97}
98
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010099SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
102SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
103
Martin Panter3840b2a2016-03-27 01:53:46 +0000104# Same certificate as pycacert.pem, but without extra text in file
105SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200106# cert with all kinds of subject alt names
107ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100108IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100109
Martin Panter3d81d932016-01-14 09:36:00 +0000110REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000111
112EMPTYCERT = data_file("nullcert.pem")
113BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000114NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000115BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200116NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200117NULLBYTECERT = data_file("nullbytecert.pem")
Christian Heimesa37f5242019-01-15 23:47:42 +0100118TALOS_INVALID_CRLDP = data_file("talos-2019-0758.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000119
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200120DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100121BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000122
Christian Heimes358cfd42016-09-10 22:43:48 +0200123# Not defined in all versions of OpenSSL
124OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
125OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
126OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
127OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100128OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Thomas Woutersed03b412007-08-28 21:37:11 +0000131def handle_error(prefix):
132 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000133 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000134 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Antoine Pitroub5218772010-05-21 09:56:06 +0000136def can_clear_options():
137 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200138 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000139
140def no_sslv2_implies_sslv3_hello():
141 # 0.9.7h or higher
142 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
143
Christian Heimes2427b502013-11-23 11:24:32 +0100144def have_verify_flags():
145 # 0.9.8 or higher
146 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
147
Christian Heimesb7b92252018-02-25 09:49:31 +0100148def _have_secp_curves():
149 if not ssl.HAS_ECDH:
150 return False
151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
152 try:
153 ctx.set_ecdh_curve("secp384r1")
154 except ValueError:
155 return False
156 else:
157 return True
158
159
160HAVE_SECP_CURVES = _have_secp_curves()
161
162
Antoine Pitrouc695c952014-04-28 20:57:36 +0200163def utc_offset(): #NOTE: ignore issues like #1647654
164 # local time = utc time + utc offset
165 if time.daylight and time.localtime().tm_isdst > 0:
166 return -time.altzone # seconds
167 return -time.timezone
168
Christian Heimes9424bb42013-06-17 15:32:57 +0200169def asn1time(cert_time):
170 # Some versions of OpenSSL ignore seconds, see #18207
171 # 0.9.8.i
172 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
173 fmt = "%b %d %H:%M:%S %Y GMT"
174 dt = datetime.datetime.strptime(cert_time, fmt)
175 dt = dt.replace(second=0)
176 cert_time = dt.strftime(fmt)
177 # %d adds leading zero but ASN1_TIME_print() uses leading space
178 if cert_time[4] == "0":
179 cert_time = cert_time[:4] + " " + cert_time[5:]
180
181 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000182
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100183needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
184
Antoine Pitrou23df4832010-08-04 17:14:06 +0000185
Christian Heimesd0486372016-09-10 23:23:33 +0200186def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
187 cert_reqs=ssl.CERT_NONE, ca_certs=None,
188 ciphers=None, certfile=None, keyfile=None,
189 **kwargs):
190 context = ssl.SSLContext(ssl_version)
191 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200192 if cert_reqs == ssl.CERT_NONE:
193 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200194 context.verify_mode = cert_reqs
195 if ca_certs is not None:
196 context.load_verify_locations(ca_certs)
197 if certfile is not None or keyfile is not None:
198 context.load_cert_chain(certfile, keyfile)
199 if ciphers is not None:
200 context.set_ciphers(ciphers)
201 return context.wrap_socket(sock, **kwargs)
202
Christian Heimesa170fa12017-09-15 20:27:30 +0200203
204def testing_context(server_cert=SIGNED_CERTFILE):
205 """Create context
206
207 client_context, server_context, hostname = testing_context()
208 """
209 if server_cert == SIGNED_CERTFILE:
210 hostname = SIGNED_CERTFILE_HOSTNAME
211 elif server_cert == SIGNED_CERTFILE2:
212 hostname = SIGNED_CERTFILE2_HOSTNAME
213 else:
214 raise ValueError(server_cert)
215
216 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
217 client_context.load_verify_locations(SIGNING_CA)
218
219 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
220 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200221 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200222
223 return client_context, server_context, hostname
224
225
Antoine Pitrou152efa22010-05-16 18:19:27 +0000226class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000227
Antoine Pitrou480a1242010-04-28 21:37:09 +0000228 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000229 ssl.CERT_NONE
230 ssl.CERT_OPTIONAL
231 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100232 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100233 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100234 if ssl.HAS_ECDH:
235 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100236 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
237 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000238 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100239 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700240 ssl.OP_NO_SSLv2
241 ssl.OP_NO_SSLv3
242 ssl.OP_NO_TLSv1
243 ssl.OP_NO_TLSv1_3
244 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
245 ssl.OP_NO_TLSv1_1
246 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200247 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000248
Christian Heimes9d50ab52018-02-27 10:17:30 +0100249 def test_private_init(self):
250 with self.assertRaisesRegex(TypeError, "public constructor"):
251 with socket.socket() as s:
252 ssl.SSLSocket(s)
253
Antoine Pitrou172f0252014-04-18 20:33:08 +0200254 def test_str_for_enums(self):
255 # Make sure that the PROTOCOL_* constants have enum-like string
256 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200257 proto = ssl.PROTOCOL_TLS
258 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200259 ctx = ssl.SSLContext(proto)
260 self.assertIs(ctx.protocol, proto)
261
Antoine Pitrou480a1242010-04-28 21:37:09 +0000262 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000263 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000265 sys.stdout.write("\n RAND_status is %d (%s)\n"
266 % (v, (v and "sufficient randomness") or
267 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200268
269 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
270 self.assertEqual(len(data), 16)
271 self.assertEqual(is_cryptographic, v == 1)
272 if v:
273 data = ssl.RAND_bytes(16)
274 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200275 else:
276 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200277
Victor Stinner1e81a392013-12-19 16:47:04 +0100278 # negative num is invalid
279 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
280 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
281
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100282 if hasattr(ssl, 'RAND_egd'):
283 self.assertRaises(TypeError, ssl.RAND_egd, 1)
284 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000285 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200286 ssl.RAND_add(b"this is a random bytes object", 75.0)
287 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000288
Christian Heimesf77b4b22013-08-21 13:26:05 +0200289 @unittest.skipUnless(os.name == 'posix', 'requires posix')
290 def test_random_fork(self):
291 status = ssl.RAND_status()
292 if not status:
293 self.fail("OpenSSL's PRNG has insufficient randomness")
294
295 rfd, wfd = os.pipe()
296 pid = os.fork()
297 if pid == 0:
298 try:
299 os.close(rfd)
300 child_random = ssl.RAND_pseudo_bytes(16)[0]
301 self.assertEqual(len(child_random), 16)
302 os.write(wfd, child_random)
303 os.close(wfd)
304 except BaseException:
305 os._exit(1)
306 else:
307 os._exit(0)
308 else:
309 os.close(wfd)
310 self.addCleanup(os.close, rfd)
311 _, status = os.waitpid(pid, 0)
312 self.assertEqual(status, 0)
313
314 child_random = os.read(rfd, 16)
315 self.assertEqual(len(child_random), 16)
316 parent_random = ssl.RAND_pseudo_bytes(16)[0]
317 self.assertEqual(len(parent_random), 16)
318
319 self.assertNotEqual(child_random, parent_random)
320
Christian Heimese6dac002018-08-30 07:25:49 +0200321 maxDiff = None
322
Antoine Pitrou480a1242010-04-28 21:37:09 +0000323 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000324 # note that this uses an 'unofficial' function in _ssl.c,
325 # provided solely for this test, to exercise the certificate
326 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100327 self.assertEqual(
328 ssl._ssl._test_decode_cert(CERTFILE),
329 CERTFILE_INFO
330 )
331 self.assertEqual(
332 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
333 SIGNED_CERTFILE_INFO
334 )
335
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200336 # Issue #13034: the subjectAltName in some certificates
337 # (notably projects.developer.nokia.com:443) wasn't parsed
338 p = ssl._ssl._test_decode_cert(NOKIACERT)
339 if support.verbose:
340 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
341 self.assertEqual(p['subjectAltName'],
342 (('DNS', 'projects.developer.nokia.com'),
343 ('DNS', 'projects.forum.nokia.com'))
344 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100345 # extra OCSP and AIA fields
346 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
347 self.assertEqual(p['caIssuers'],
348 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
349 self.assertEqual(p['crlDistributionPoints'],
350 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000351
Christian Heimesa37f5242019-01-15 23:47:42 +0100352 def test_parse_cert_CVE_2019_5010(self):
353 p = ssl._ssl._test_decode_cert(TALOS_INVALID_CRLDP)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 self.assertEqual(
357 p,
358 {
359 'issuer': (
360 (('countryName', 'UK'),), (('commonName', 'cody-ca'),)),
361 'notAfter': 'Jun 14 18:00:58 2028 GMT',
362 'notBefore': 'Jun 18 18:00:58 2018 GMT',
363 'serialNumber': '02',
364 'subject': ((('countryName', 'UK'),),
365 (('commonName',
366 'codenomicon-vm-2.test.lal.cisco.com'),)),
367 'subjectAltName': (
368 ('DNS', 'codenomicon-vm-2.test.lal.cisco.com'),),
369 'version': 3
370 }
371 )
372
Christian Heimes824f7f32013-08-17 00:54:47 +0200373 def test_parse_cert_CVE_2013_4238(self):
374 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
375 if support.verbose:
376 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
377 subject = ((('countryName', 'US'),),
378 (('stateOrProvinceName', 'Oregon'),),
379 (('localityName', 'Beaverton'),),
380 (('organizationName', 'Python Software Foundation'),),
381 (('organizationalUnitName', 'Python Core Development'),),
382 (('commonName', 'null.python.org\x00example.org'),),
383 (('emailAddress', 'python-dev@python.org'),))
384 self.assertEqual(p['subject'], subject)
385 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200386 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
387 san = (('DNS', 'altnull.python.org\x00example.com'),
388 ('email', 'null@python.org\x00user@example.org'),
389 ('URI', 'http://null.python.org\x00http://example.org'),
390 ('IP Address', '192.0.2.1'),
391 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
392 else:
393 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
394 san = (('DNS', 'altnull.python.org\x00example.com'),
395 ('email', 'null@python.org\x00user@example.org'),
396 ('URI', 'http://null.python.org\x00http://example.org'),
397 ('IP Address', '192.0.2.1'),
398 ('IP Address', '<invalid>'))
399
400 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200401
Christian Heimes1c03abd2016-09-06 23:25:35 +0200402 def test_parse_all_sans(self):
403 p = ssl._ssl._test_decode_cert(ALLSANFILE)
404 self.assertEqual(p['subjectAltName'],
405 (
406 ('DNS', 'allsans'),
407 ('othername', '<unsupported>'),
408 ('othername', '<unsupported>'),
409 ('email', 'user@example.org'),
410 ('DNS', 'www.example.org'),
411 ('DirName',
412 ((('countryName', 'XY'),),
413 (('localityName', 'Castle Anthrax'),),
414 (('organizationName', 'Python Software Foundation'),),
415 (('commonName', 'dirname example'),))),
416 ('URI', 'https://www.python.org/'),
417 ('IP Address', '127.0.0.1'),
418 ('IP Address', '0:0:0:0:0:0:0:1\n'),
419 ('Registered ID', '1.2.3.4.5')
420 )
421 )
422
Antoine Pitrou480a1242010-04-28 21:37:09 +0000423 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000424 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000425 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000426 d1 = ssl.PEM_cert_to_DER_cert(pem)
427 p2 = ssl.DER_cert_to_PEM_cert(d1)
428 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000429 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000430 if not p2.startswith(ssl.PEM_HEADER + '\n'):
431 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
432 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
433 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000434
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000435 def test_openssl_version(self):
436 n = ssl.OPENSSL_VERSION_NUMBER
437 t = ssl.OPENSSL_VERSION_INFO
438 s = ssl.OPENSSL_VERSION
439 self.assertIsInstance(n, int)
440 self.assertIsInstance(t, tuple)
441 self.assertIsInstance(s, str)
442 # Some sanity checks follow
443 # >= 0.9
444 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400445 # < 3.0
446 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000447 major, minor, fix, patch, status = t
448 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400449 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000450 self.assertGreaterEqual(minor, 0)
451 self.assertLess(minor, 256)
452 self.assertGreaterEqual(fix, 0)
453 self.assertLess(fix, 256)
454 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100455 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000456 self.assertGreaterEqual(status, 0)
457 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400458 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200459 if IS_LIBRESSL:
460 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100461 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400462 else:
463 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100464 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000465
Antoine Pitrou9d543662010-04-23 23:10:32 +0000466 @support.cpython_only
467 def test_refcycle(self):
468 # Issue #7943: an SSL object doesn't create reference cycles with
469 # itself.
470 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200471 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000472 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100473 with support.check_warnings(("", ResourceWarning)):
474 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100475 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000476
Antoine Pitroua468adc2010-09-14 14:43:44 +0000477 def test_wrapped_unconnected(self):
478 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200479 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000480 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200481 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100482 self.assertRaises(OSError, ss.recv, 1)
483 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
484 self.assertRaises(OSError, ss.recvfrom, 1)
485 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
486 self.assertRaises(OSError, ss.send, b'x')
487 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200488 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100489 self.assertRaises(NotImplementedError, ss.sendmsg,
490 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200491 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
492 self.assertRaises(NotImplementedError, ss.recvmsg_into,
493 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000494
Antoine Pitrou40f08742010-04-24 22:04:40 +0000495 def test_timeout(self):
496 # Issue #8524: when creating an SSL socket, the timeout of the
497 # original socket should be retained.
498 for timeout in (None, 0.0, 5.0):
499 s = socket.socket(socket.AF_INET)
500 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200501 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100502 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000503
Christian Heimesd0486372016-09-10 23:23:33 +0200504 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000505 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000506 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000507 "certfile must be specified",
508 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000509 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000510 "certfile must be specified for server-side operations",
511 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000512 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000513 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200514 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100515 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
516 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200517 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200518 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000519 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000520 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000521 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200522 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000523 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000524 ssl.wrap_socket(sock,
525 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000526 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200527 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000528 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000529 ssl.wrap_socket(sock,
530 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000531 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000532
Martin Panter3464ea22016-02-01 21:58:11 +0000533 def bad_cert_test(self, certfile):
534 """Check that trying to use the given client certificate fails"""
535 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
536 certfile)
537 sock = socket.socket()
538 self.addCleanup(sock.close)
539 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200540 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200541 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000542
543 def test_empty_cert(self):
544 """Wrapping with an empty cert file"""
545 self.bad_cert_test("nullcert.pem")
546
547 def test_malformed_cert(self):
548 """Wrapping with a badly formatted certificate (syntax error)"""
549 self.bad_cert_test("badcert.pem")
550
551 def test_malformed_key(self):
552 """Wrapping with a badly formatted key (syntax error)"""
553 self.bad_cert_test("badkey.pem")
554
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 def test_match_hostname(self):
556 def ok(cert, hostname):
557 ssl.match_hostname(cert, hostname)
558 def fail(cert, hostname):
559 self.assertRaises(ssl.CertificateError,
560 ssl.match_hostname, cert, hostname)
561
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100562 # -- Hostname matching --
563
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000564 cert = {'subject': ((('commonName', 'example.com'),),)}
565 ok(cert, 'example.com')
566 ok(cert, 'ExAmple.cOm')
567 fail(cert, 'www.example.com')
568 fail(cert, '.example.com')
569 fail(cert, 'example.org')
570 fail(cert, 'exampleXcom')
571
572 cert = {'subject': ((('commonName', '*.a.com'),),)}
573 ok(cert, 'foo.a.com')
574 fail(cert, 'bar.foo.a.com')
575 fail(cert, 'a.com')
576 fail(cert, 'Xa.com')
577 fail(cert, '.a.com')
578
Mandeep Singhede2ac92017-11-27 04:01:27 +0530579 # only match wildcards when they are the only thing
580 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000581 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530582 fail(cert, 'foo.com')
583 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000584 fail(cert, 'bar.com')
585 fail(cert, 'foo.a.com')
586 fail(cert, 'bar.foo.com')
587
Christian Heimes824f7f32013-08-17 00:54:47 +0200588 # NULL bytes are bad, CVE-2013-4073
589 cert = {'subject': ((('commonName',
590 'null.python.org\x00example.org'),),)}
591 ok(cert, 'null.python.org\x00example.org') # or raise an error?
592 fail(cert, 'example.org')
593 fail(cert, 'null.python.org')
594
Georg Brandl72c98d32013-10-27 07:16:53 +0100595 # error cases with wildcards
596 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
597 fail(cert, 'bar.foo.a.com')
598 fail(cert, 'a.com')
599 fail(cert, 'Xa.com')
600 fail(cert, '.a.com')
601
602 cert = {'subject': ((('commonName', 'a.*.com'),),)}
603 fail(cert, 'a.foo.com')
604 fail(cert, 'a..com')
605 fail(cert, 'a.com')
606
607 # wildcard doesn't match IDNA prefix 'xn--'
608 idna = 'püthon.python.org'.encode("idna").decode("ascii")
609 cert = {'subject': ((('commonName', idna),),)}
610 ok(cert, idna)
611 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
612 fail(cert, idna)
613 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
614 fail(cert, idna)
615
616 # wildcard in first fragment and IDNA A-labels in sequent fragments
617 # are supported.
618 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
619 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530620 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
621 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100622 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
623 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
624
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000625 # Slightly fake real-world example
626 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
627 'subject': ((('commonName', 'linuxfrz.org'),),),
628 'subjectAltName': (('DNS', 'linuxfr.org'),
629 ('DNS', 'linuxfr.com'),
630 ('othername', '<unsupported>'))}
631 ok(cert, 'linuxfr.org')
632 ok(cert, 'linuxfr.com')
633 # Not a "DNS" entry
634 fail(cert, '<unsupported>')
635 # When there is a subjectAltName, commonName isn't used
636 fail(cert, 'linuxfrz.org')
637
638 # A pristine real-world example
639 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
640 'subject': ((('countryName', 'US'),),
641 (('stateOrProvinceName', 'California'),),
642 (('localityName', 'Mountain View'),),
643 (('organizationName', 'Google Inc'),),
644 (('commonName', 'mail.google.com'),))}
645 ok(cert, 'mail.google.com')
646 fail(cert, 'gmail.com')
647 # Only commonName is considered
648 fail(cert, 'California')
649
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100650 # -- IPv4 matching --
651 cert = {'subject': ((('commonName', 'example.com'),),),
652 'subjectAltName': (('DNS', 'example.com'),
653 ('IP Address', '10.11.12.13'),
654 ('IP Address', '14.15.16.17'))}
655 ok(cert, '10.11.12.13')
656 ok(cert, '14.15.16.17')
657 fail(cert, '14.15.16.18')
658 fail(cert, 'example.net')
659
660 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100661 if hasattr(socket, 'AF_INET6'):
662 cert = {'subject': ((('commonName', 'example.com'),),),
663 'subjectAltName': (
664 ('DNS', 'example.com'),
665 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
666 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
667 ok(cert, '2001::cafe')
668 ok(cert, '2003::baba')
669 fail(cert, '2003::bebe')
670 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100671
672 # -- Miscellaneous --
673
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000674 # Neither commonName nor subjectAltName
675 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
676 'subject': ((('countryName', 'US'),),
677 (('stateOrProvinceName', 'California'),),
678 (('localityName', 'Mountain View'),),
679 (('organizationName', 'Google Inc'),))}
680 fail(cert, 'mail.google.com')
681
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200682 # No DNS entry in subjectAltName but a commonName
683 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
684 'subject': ((('countryName', 'US'),),
685 (('stateOrProvinceName', 'California'),),
686 (('localityName', 'Mountain View'),),
687 (('commonName', 'mail.google.com'),)),
688 'subjectAltName': (('othername', 'blabla'), )}
689 ok(cert, 'mail.google.com')
690
691 # No DNS entry subjectAltName and no commonName
692 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
693 'subject': ((('countryName', 'US'),),
694 (('stateOrProvinceName', 'California'),),
695 (('localityName', 'Mountain View'),),
696 (('organizationName', 'Google Inc'),)),
697 'subjectAltName': (('othername', 'blabla'),)}
698 fail(cert, 'google.com')
699
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000700 # Empty cert / no cert
701 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
702 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
703
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200704 # Issue #17980: avoid denials of service by refusing more than one
705 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100706 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
707 with self.assertRaisesRegex(
708 ssl.CertificateError,
709 "partial wildcards in leftmost label are not supported"):
710 ssl.match_hostname(cert, 'axxb.example.com')
711
712 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
713 with self.assertRaisesRegex(
714 ssl.CertificateError,
715 "wildcard can only be present in the leftmost label"):
716 ssl.match_hostname(cert, 'www.sub.example.com')
717
718 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
719 with self.assertRaisesRegex(
720 ssl.CertificateError,
721 "too many wildcards"):
722 ssl.match_hostname(cert, 'axxbxxc.example.com')
723
724 cert = {'subject': ((('commonName', '*'),),)}
725 with self.assertRaisesRegex(
726 ssl.CertificateError,
727 "sole wildcard without additional labels are not support"):
728 ssl.match_hostname(cert, 'host')
729
730 cert = {'subject': ((('commonName', '*.com'),),)}
731 with self.assertRaisesRegex(
732 ssl.CertificateError,
733 r"hostname 'com' doesn't match '\*.com'"):
734 ssl.match_hostname(cert, 'com')
735
736 # extra checks for _inet_paton()
737 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
738 with self.assertRaises(ValueError):
739 ssl._inet_paton(invalid)
740 for ipaddr in ['127.0.0.1', '192.168.0.1']:
741 self.assertTrue(ssl._inet_paton(ipaddr))
742 if hasattr(socket, 'AF_INET6'):
743 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
744 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200745
Antoine Pitroud5323212010-10-22 18:19:07 +0000746 def test_server_side(self):
747 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200748 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000749 with socket.socket() as sock:
750 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
751 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000752
Antoine Pitroud6494802011-07-21 01:11:30 +0200753 def test_unknown_channel_binding(self):
754 # should raise ValueError for unknown type
755 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200756 s.bind(('127.0.0.1', 0))
757 s.listen()
758 c = socket.socket(socket.AF_INET)
759 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200760 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100761 with self.assertRaises(ValueError):
762 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200763 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200764
765 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
766 "'tls-unique' channel binding not available")
767 def test_tls_unique_channel_binding(self):
768 # unconnected should return None for known type
769 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200770 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100771 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200772 # the same for server-side
773 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200774 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100775 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200776
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600777 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200778 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600779 r = repr(ss)
780 with self.assertWarns(ResourceWarning) as cm:
781 ss = None
782 support.gc_collect()
783 self.assertIn(r, str(cm.warning.args[0]))
784
Christian Heimes6d7ad132013-06-09 18:02:55 +0200785 def test_get_default_verify_paths(self):
786 paths = ssl.get_default_verify_paths()
787 self.assertEqual(len(paths), 6)
788 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
789
790 with support.EnvironmentVarGuard() as env:
791 env["SSL_CERT_DIR"] = CAPATH
792 env["SSL_CERT_FILE"] = CERTFILE
793 paths = ssl.get_default_verify_paths()
794 self.assertEqual(paths.cafile, CERTFILE)
795 self.assertEqual(paths.capath, CAPATH)
796
Christian Heimes44109d72013-11-22 01:51:30 +0100797 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
798 def test_enum_certificates(self):
799 self.assertTrue(ssl.enum_certificates("CA"))
800 self.assertTrue(ssl.enum_certificates("ROOT"))
801
802 self.assertRaises(TypeError, ssl.enum_certificates)
803 self.assertRaises(WindowsError, ssl.enum_certificates, "")
804
Christian Heimesc2d65e12013-11-22 16:13:55 +0100805 trust_oids = set()
806 for storename in ("CA", "ROOT"):
807 store = ssl.enum_certificates(storename)
808 self.assertIsInstance(store, list)
809 for element in store:
810 self.assertIsInstance(element, tuple)
811 self.assertEqual(len(element), 3)
812 cert, enc, trust = element
813 self.assertIsInstance(cert, bytes)
814 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
815 self.assertIsInstance(trust, (set, bool))
816 if isinstance(trust, set):
817 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100818
819 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100820 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200821
Christian Heimes46bebee2013-06-09 19:03:31 +0200822 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100823 def test_enum_crls(self):
824 self.assertTrue(ssl.enum_crls("CA"))
825 self.assertRaises(TypeError, ssl.enum_crls)
826 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200827
Christian Heimes44109d72013-11-22 01:51:30 +0100828 crls = ssl.enum_crls("CA")
829 self.assertIsInstance(crls, list)
830 for element in crls:
831 self.assertIsInstance(element, tuple)
832 self.assertEqual(len(element), 2)
833 self.assertIsInstance(element[0], bytes)
834 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200835
Christian Heimes46bebee2013-06-09 19:03:31 +0200836
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100837 def test_asn1object(self):
838 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
839 '1.3.6.1.5.5.7.3.1')
840
841 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
842 self.assertEqual(val, expected)
843 self.assertEqual(val.nid, 129)
844 self.assertEqual(val.shortname, 'serverAuth')
845 self.assertEqual(val.longname, 'TLS Web Server Authentication')
846 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
847 self.assertIsInstance(val, ssl._ASN1Object)
848 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
849
850 val = ssl._ASN1Object.fromnid(129)
851 self.assertEqual(val, expected)
852 self.assertIsInstance(val, ssl._ASN1Object)
853 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100854 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
855 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100856 for i in range(1000):
857 try:
858 obj = ssl._ASN1Object.fromnid(i)
859 except ValueError:
860 pass
861 else:
862 self.assertIsInstance(obj.nid, int)
863 self.assertIsInstance(obj.shortname, str)
864 self.assertIsInstance(obj.longname, str)
865 self.assertIsInstance(obj.oid, (str, type(None)))
866
867 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
868 self.assertEqual(val, expected)
869 self.assertIsInstance(val, ssl._ASN1Object)
870 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
871 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
872 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100873 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
874 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100875
Christian Heimes72d28502013-11-23 13:56:58 +0100876 def test_purpose_enum(self):
877 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
878 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
879 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
880 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
881 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
882 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
883 '1.3.6.1.5.5.7.3.1')
884
885 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
886 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
887 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
888 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
889 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
890 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
891 '1.3.6.1.5.5.7.3.2')
892
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100893 def test_unsupported_dtls(self):
894 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
895 self.addCleanup(s.close)
896 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200897 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100898 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200899 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100900 with self.assertRaises(NotImplementedError) as cx:
901 ctx.wrap_socket(s)
902 self.assertEqual(str(cx.exception), "only stream sockets are supported")
903
Antoine Pitrouc695c952014-04-28 20:57:36 +0200904 def cert_time_ok(self, timestring, timestamp):
905 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
906
907 def cert_time_fail(self, timestring):
908 with self.assertRaises(ValueError):
909 ssl.cert_time_to_seconds(timestring)
910
911 @unittest.skipUnless(utc_offset(),
912 'local time needs to be different from UTC')
913 def test_cert_time_to_seconds_timezone(self):
914 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
915 # results if local timezone is not UTC
916 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
917 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
918
919 def test_cert_time_to_seconds(self):
920 timestring = "Jan 5 09:34:43 2018 GMT"
921 ts = 1515144883.0
922 self.cert_time_ok(timestring, ts)
923 # accept keyword parameter, assert its name
924 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
925 # accept both %e and %d (space or zero generated by strftime)
926 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
927 # case-insensitive
928 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
929 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
930 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
931 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
932 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
933 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
934 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
935 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
936
937 newyear_ts = 1230768000.0
938 # leap seconds
939 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
940 # same timestamp
941 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
942
943 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
944 # allow 60th second (even if it is not a leap second)
945 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
946 # allow 2nd leap second for compatibility with time.strptime()
947 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
948 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
949
Mike53f7a7c2017-12-14 14:04:53 +0300950 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200951 # 99991231235959Z (rfc 5280)
952 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
953
954 @support.run_with_locale('LC_ALL', '')
955 def test_cert_time_to_seconds_locale(self):
956 # `cert_time_to_seconds()` should be locale independent
957
958 def local_february_name():
959 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
960
961 if local_february_name().lower() == 'feb':
962 self.skipTest("locale-specific month name needs to be "
963 "different from C locale")
964
965 # locale-independent
966 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
967 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
968
Martin Panter3840b2a2016-03-27 01:53:46 +0000969 def test_connect_ex_error(self):
970 server = socket.socket(socket.AF_INET)
971 self.addCleanup(server.close)
972 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200973 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000974 cert_reqs=ssl.CERT_REQUIRED)
975 self.addCleanup(s.close)
976 rc = s.connect_ex((HOST, port))
977 # Issue #19919: Windows machines or VMs hosted on Windows
978 # machines sometimes return EWOULDBLOCK.
979 errors = (
980 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
981 errno.EWOULDBLOCK,
982 )
983 self.assertIn(rc, errors)
984
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100985
Antoine Pitrou152efa22010-05-16 18:19:27 +0000986class ContextTests(unittest.TestCase):
987
988 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100989 for protocol in PROTOCOLS:
990 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200991 ctx = ssl.SSLContext()
992 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000993 self.assertRaises(ValueError, ssl.SSLContext, -1)
994 self.assertRaises(ValueError, ssl.SSLContext, 42)
995
996 def test_protocol(self):
997 for proto in PROTOCOLS:
998 ctx = ssl.SSLContext(proto)
999 self.assertEqual(ctx.protocol, proto)
1000
1001 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001002 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001003 ctx.set_ciphers("ALL")
1004 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +00001005 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +00001006 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +00001007
Christian Heimes892d66e2018-01-29 14:10:18 +01001008 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
1009 "Test applies only to Python default ciphers")
1010 def test_python_ciphers(self):
1011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1012 ciphers = ctx.get_ciphers()
1013 for suite in ciphers:
1014 name = suite['name']
1015 self.assertNotIn("PSK", name)
1016 self.assertNotIn("SRP", name)
1017 self.assertNotIn("MD5", name)
1018 self.assertNotIn("RC4", name)
1019 self.assertNotIn("3DES", name)
1020
Christian Heimes25bfcd52016-09-06 00:04:45 +02001021 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1022 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001024 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001025 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001026 self.assertIn('AES256-GCM-SHA384', names)
1027 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001028
Antoine Pitroub5218772010-05-21 09:56:06 +00001029 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001030 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001031 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001032 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001033 # SSLContext also enables these by default
1034 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001035 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1036 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001037 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001038 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001039 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001040 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001041 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1042 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001043 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001044 # Ubuntu has OP_NO_SSLv3 forced on by default
1045 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001046 else:
1047 with self.assertRaises(ValueError):
1048 ctx.options = 0
1049
Christian Heimesa170fa12017-09-15 20:27:30 +02001050 def test_verify_mode_protocol(self):
1051 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001052 # Default value
1053 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1054 ctx.verify_mode = ssl.CERT_OPTIONAL
1055 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1056 ctx.verify_mode = ssl.CERT_REQUIRED
1057 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1058 ctx.verify_mode = ssl.CERT_NONE
1059 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1060 with self.assertRaises(TypeError):
1061 ctx.verify_mode = None
1062 with self.assertRaises(ValueError):
1063 ctx.verify_mode = 42
1064
Christian Heimesa170fa12017-09-15 20:27:30 +02001065 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1066 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1067 self.assertFalse(ctx.check_hostname)
1068
1069 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1070 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1071 self.assertTrue(ctx.check_hostname)
1072
Christian Heimes61d478c2018-01-27 15:51:38 +01001073 def test_hostname_checks_common_name(self):
1074 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1075 self.assertTrue(ctx.hostname_checks_common_name)
1076 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1077 ctx.hostname_checks_common_name = True
1078 self.assertTrue(ctx.hostname_checks_common_name)
1079 ctx.hostname_checks_common_name = False
1080 self.assertFalse(ctx.hostname_checks_common_name)
1081 ctx.hostname_checks_common_name = True
1082 self.assertTrue(ctx.hostname_checks_common_name)
1083 else:
1084 with self.assertRaises(AttributeError):
1085 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001086
Christian Heimes698dde12018-02-27 11:54:43 +01001087 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1088 "required OpenSSL 1.1.0g")
1089 def test_min_max_version(self):
1090 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes34de2d32019-01-18 16:09:30 +01001091 # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
1092 # Fedora override the setting to TLS 1.0.
1093 self.assertIn(
1094 ctx.minimum_version,
1095 {ssl.TLSVersion.MINIMUM_SUPPORTED, ssl.TLSVersion.TLSv1}
Christian Heimes698dde12018-02-27 11:54:43 +01001096 )
1097 self.assertEqual(
1098 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1099 )
1100
1101 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1102 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1103 self.assertEqual(
1104 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1105 )
1106 self.assertEqual(
1107 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1108 )
1109
1110 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1111 ctx.maximum_version = ssl.TLSVersion.TLSv1
1112 self.assertEqual(
1113 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1114 )
1115 self.assertEqual(
1116 ctx.maximum_version, ssl.TLSVersion.TLSv1
1117 )
1118
1119 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1120 self.assertEqual(
1121 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1122 )
1123
1124 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1125 self.assertIn(
1126 ctx.maximum_version,
1127 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1128 )
1129
1130 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1131 self.assertIn(
1132 ctx.minimum_version,
1133 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1134 )
1135
1136 with self.assertRaises(ValueError):
1137 ctx.minimum_version = 42
1138
1139 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1140
1141 self.assertEqual(
1142 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1143 )
1144 self.assertEqual(
1145 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1146 )
1147 with self.assertRaises(ValueError):
1148 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1149 with self.assertRaises(ValueError):
1150 ctx.maximum_version = ssl.TLSVersion.TLSv1
1151
1152
Christian Heimes2427b502013-11-23 11:24:32 +01001153 @unittest.skipUnless(have_verify_flags(),
1154 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001155 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001156 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001157 # default value
1158 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1159 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001160 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1161 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1162 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1163 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1164 ctx.verify_flags = ssl.VERIFY_DEFAULT
1165 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1166 # supports any value
1167 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1168 self.assertEqual(ctx.verify_flags,
1169 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1170 with self.assertRaises(TypeError):
1171 ctx.verify_flags = None
1172
Antoine Pitrou152efa22010-05-16 18:19:27 +00001173 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001174 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001175 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001176 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001177 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1178 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001179 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001180 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001181 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001182 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001183 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001184 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001185 ctx.load_cert_chain(EMPTYCERT)
1186 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001187 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001188 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1189 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1190 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001191 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001192 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001193 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001194 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001195 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001196 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1197 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001198 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001199 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001200 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001201 # Password protected key and cert
1202 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1203 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1204 ctx.load_cert_chain(CERTFILE_PROTECTED,
1205 password=bytearray(KEY_PASSWORD.encode()))
1206 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1207 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1208 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1209 bytearray(KEY_PASSWORD.encode()))
1210 with self.assertRaisesRegex(TypeError, "should be a string"):
1211 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1212 with self.assertRaises(ssl.SSLError):
1213 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1214 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1215 # openssl has a fixed limit on the password buffer.
1216 # PEM_BUFSIZE is generally set to 1kb.
1217 # Return a string larger than this.
1218 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1219 # Password callback
1220 def getpass_unicode():
1221 return KEY_PASSWORD
1222 def getpass_bytes():
1223 return KEY_PASSWORD.encode()
1224 def getpass_bytearray():
1225 return bytearray(KEY_PASSWORD.encode())
1226 def getpass_badpass():
1227 return "badpass"
1228 def getpass_huge():
1229 return b'a' * (1024 * 1024)
1230 def getpass_bad_type():
1231 return 9
1232 def getpass_exception():
1233 raise Exception('getpass error')
1234 class GetPassCallable:
1235 def __call__(self):
1236 return KEY_PASSWORD
1237 def getpass(self):
1238 return KEY_PASSWORD
1239 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1240 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1241 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1242 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1243 ctx.load_cert_chain(CERTFILE_PROTECTED,
1244 password=GetPassCallable().getpass)
1245 with self.assertRaises(ssl.SSLError):
1246 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1247 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1248 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1249 with self.assertRaisesRegex(TypeError, "must return a string"):
1250 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1251 with self.assertRaisesRegex(Exception, "getpass error"):
1252 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1253 # Make sure the password function isn't called if it isn't needed
1254 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001255
1256 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001257 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001258 ctx.load_verify_locations(CERTFILE)
1259 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1260 ctx.load_verify_locations(BYTES_CERTFILE)
1261 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1262 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001263 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001264 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001265 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001266 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001267 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001268 ctx.load_verify_locations(BADCERT)
1269 ctx.load_verify_locations(CERTFILE, CAPATH)
1270 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1271
Victor Stinner80f75e62011-01-29 11:31:20 +00001272 # Issue #10989: crash if the second argument type is invalid
1273 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1274
Christian Heimesefff7062013-11-21 03:35:02 +01001275 def test_load_verify_cadata(self):
1276 # test cadata
1277 with open(CAFILE_CACERT) as f:
1278 cacert_pem = f.read()
1279 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1280 with open(CAFILE_NEURONIO) as f:
1281 neuronio_pem = f.read()
1282 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1283
1284 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001285 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001286 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1287 ctx.load_verify_locations(cadata=cacert_pem)
1288 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1289 ctx.load_verify_locations(cadata=neuronio_pem)
1290 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1291 # cert already in hash table
1292 ctx.load_verify_locations(cadata=neuronio_pem)
1293 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1294
1295 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001296 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001297 combined = "\n".join((cacert_pem, neuronio_pem))
1298 ctx.load_verify_locations(cadata=combined)
1299 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1300
1301 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001302 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001303 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1304 neuronio_pem, "tail"]
1305 ctx.load_verify_locations(cadata="\n".join(combined))
1306 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1307
1308 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001310 ctx.load_verify_locations(cadata=cacert_der)
1311 ctx.load_verify_locations(cadata=neuronio_der)
1312 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1313 # cert already in hash table
1314 ctx.load_verify_locations(cadata=cacert_der)
1315 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1316
1317 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001318 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001319 combined = b"".join((cacert_der, neuronio_der))
1320 ctx.load_verify_locations(cadata=combined)
1321 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1322
1323 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001324 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001325 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1326
1327 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1328 ctx.load_verify_locations(cadata="broken")
1329 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1330 ctx.load_verify_locations(cadata=b"broken")
1331
1332
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001333 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001334 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001335 ctx.load_dh_params(DHFILE)
1336 if os.name != 'nt':
1337 ctx.load_dh_params(BYTES_DHFILE)
1338 self.assertRaises(TypeError, ctx.load_dh_params)
1339 self.assertRaises(TypeError, ctx.load_dh_params, None)
1340 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001341 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001342 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001343 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001344 ctx.load_dh_params(CERTFILE)
1345
Antoine Pitroub0182c82010-10-12 20:09:02 +00001346 def test_session_stats(self):
1347 for proto in PROTOCOLS:
1348 ctx = ssl.SSLContext(proto)
1349 self.assertEqual(ctx.session_stats(), {
1350 'number': 0,
1351 'connect': 0,
1352 'connect_good': 0,
1353 'connect_renegotiate': 0,
1354 'accept': 0,
1355 'accept_good': 0,
1356 'accept_renegotiate': 0,
1357 'hits': 0,
1358 'misses': 0,
1359 'timeouts': 0,
1360 'cache_full': 0,
1361 })
1362
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001363 def test_set_default_verify_paths(self):
1364 # There's not much we can do to test that it acts as expected,
1365 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001366 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001367 ctx.set_default_verify_paths()
1368
Antoine Pitrou501da612011-12-21 09:27:41 +01001369 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001370 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001371 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001372 ctx.set_ecdh_curve("prime256v1")
1373 ctx.set_ecdh_curve(b"prime256v1")
1374 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1375 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1376 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1377 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1378
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001379 @needs_sni
1380 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001381 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001382
1383 # set_servername_callback expects a callable, or None
1384 self.assertRaises(TypeError, ctx.set_servername_callback)
1385 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1386 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1387 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1388
1389 def dummycallback(sock, servername, ctx):
1390 pass
1391 ctx.set_servername_callback(None)
1392 ctx.set_servername_callback(dummycallback)
1393
1394 @needs_sni
1395 def test_sni_callback_refcycle(self):
1396 # Reference cycles through the servername callback are detected
1397 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001398 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001399 def dummycallback(sock, servername, ctx, cycle=ctx):
1400 pass
1401 ctx.set_servername_callback(dummycallback)
1402 wr = weakref.ref(ctx)
1403 del ctx, dummycallback
1404 gc.collect()
1405 self.assertIs(wr(), None)
1406
Christian Heimes9a5395a2013-06-17 15:44:12 +02001407 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001408 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001409 self.assertEqual(ctx.cert_store_stats(),
1410 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1411 ctx.load_cert_chain(CERTFILE)
1412 self.assertEqual(ctx.cert_store_stats(),
1413 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1414 ctx.load_verify_locations(CERTFILE)
1415 self.assertEqual(ctx.cert_store_stats(),
1416 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001417 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001418 self.assertEqual(ctx.cert_store_stats(),
1419 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1420
1421 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001422 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001423 self.assertEqual(ctx.get_ca_certs(), [])
1424 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1425 ctx.load_verify_locations(CERTFILE)
1426 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001427 # but CAFILE_CACERT is a CA cert
1428 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001429 self.assertEqual(ctx.get_ca_certs(),
1430 [{'issuer': ((('organizationName', 'Root CA'),),
1431 (('organizationalUnitName', 'http://www.cacert.org'),),
1432 (('commonName', 'CA Cert Signing Authority'),),
1433 (('emailAddress', 'support@cacert.org'),)),
1434 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1435 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1436 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001437 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001438 'subject': ((('organizationName', 'Root CA'),),
1439 (('organizationalUnitName', 'http://www.cacert.org'),),
1440 (('commonName', 'CA Cert Signing Authority'),),
1441 (('emailAddress', 'support@cacert.org'),)),
1442 'version': 3}])
1443
Martin Panterb55f8b72016-01-14 12:53:56 +00001444 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001445 pem = f.read()
1446 der = ssl.PEM_cert_to_DER_cert(pem)
1447 self.assertEqual(ctx.get_ca_certs(True), [der])
1448
Christian Heimes72d28502013-11-23 13:56:58 +01001449 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001450 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001451 ctx.load_default_certs()
1452
Christian Heimesa170fa12017-09-15 20:27:30 +02001453 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001454 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1455 ctx.load_default_certs()
1456
Christian Heimesa170fa12017-09-15 20:27:30 +02001457 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001458 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1459
Christian Heimesa170fa12017-09-15 20:27:30 +02001460 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001461 self.assertRaises(TypeError, ctx.load_default_certs, None)
1462 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1463
Benjamin Peterson91244e02014-10-03 18:17:15 -04001464 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001465 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001466 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001467 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001468 with support.EnvironmentVarGuard() as env:
1469 env["SSL_CERT_DIR"] = CAPATH
1470 env["SSL_CERT_FILE"] = CERTFILE
1471 ctx.load_default_certs()
1472 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1473
Benjamin Peterson91244e02014-10-03 18:17:15 -04001474 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001475 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001476 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001477 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001478 ctx.load_default_certs()
1479 stats = ctx.cert_store_stats()
1480
Christian Heimesa170fa12017-09-15 20:27:30 +02001481 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001482 with support.EnvironmentVarGuard() as env:
1483 env["SSL_CERT_DIR"] = CAPATH
1484 env["SSL_CERT_FILE"] = CERTFILE
1485 ctx.load_default_certs()
1486 stats["x509"] += 1
1487 self.assertEqual(ctx.cert_store_stats(), stats)
1488
Christian Heimes358cfd42016-09-10 22:43:48 +02001489 def _assert_context_options(self, ctx):
1490 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1491 if OP_NO_COMPRESSION != 0:
1492 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1493 OP_NO_COMPRESSION)
1494 if OP_SINGLE_DH_USE != 0:
1495 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1496 OP_SINGLE_DH_USE)
1497 if OP_SINGLE_ECDH_USE != 0:
1498 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1499 OP_SINGLE_ECDH_USE)
1500 if OP_CIPHER_SERVER_PREFERENCE != 0:
1501 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1502 OP_CIPHER_SERVER_PREFERENCE)
1503
Christian Heimes4c05b472013-11-23 15:58:30 +01001504 def test_create_default_context(self):
1505 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001506
Christian Heimesa170fa12017-09-15 20:27:30 +02001507 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001508 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001509 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001510 self._assert_context_options(ctx)
1511
Christian Heimes4c05b472013-11-23 15:58:30 +01001512 with open(SIGNING_CA) as f:
1513 cadata = f.read()
1514 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1515 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001516 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001517 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001518 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001519
1520 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001521 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001522 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001523 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001524
Christian Heimes67986f92013-11-23 22:43:47 +01001525 def test__create_stdlib_context(self):
1526 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001527 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001528 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001529 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001530 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001531
1532 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1533 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1534 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001535 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001536
1537 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001538 cert_reqs=ssl.CERT_REQUIRED,
1539 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001540 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1541 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001542 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001543 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001544
1545 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001546 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001547 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001548 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001549
Christian Heimes1aa9a752013-12-02 02:41:19 +01001550 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001551 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001552 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001553 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001554
Christian Heimese82c0342017-09-15 20:29:57 +02001555 # Auto set CERT_REQUIRED
1556 ctx.check_hostname = True
1557 self.assertTrue(ctx.check_hostname)
1558 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1559 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001560 ctx.verify_mode = ssl.CERT_REQUIRED
1561 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001562 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001563
Christian Heimese82c0342017-09-15 20:29:57 +02001564 # Changing verify_mode does not affect check_hostname
1565 ctx.check_hostname = False
1566 ctx.verify_mode = ssl.CERT_NONE
1567 ctx.check_hostname = False
1568 self.assertFalse(ctx.check_hostname)
1569 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1570 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001571 ctx.check_hostname = True
1572 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001573 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1574
1575 ctx.check_hostname = False
1576 ctx.verify_mode = ssl.CERT_OPTIONAL
1577 ctx.check_hostname = False
1578 self.assertFalse(ctx.check_hostname)
1579 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1580 # keep CERT_OPTIONAL
1581 ctx.check_hostname = True
1582 self.assertTrue(ctx.check_hostname)
1583 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001584
1585 # Cannot set CERT_NONE with check_hostname enabled
1586 with self.assertRaises(ValueError):
1587 ctx.verify_mode = ssl.CERT_NONE
1588 ctx.check_hostname = False
1589 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001590 ctx.verify_mode = ssl.CERT_NONE
1591 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001592
Christian Heimes5fe668c2016-09-12 00:01:11 +02001593 def test_context_client_server(self):
1594 # PROTOCOL_TLS_CLIENT has sane defaults
1595 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1596 self.assertTrue(ctx.check_hostname)
1597 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1598
1599 # PROTOCOL_TLS_SERVER has different but also sane defaults
1600 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1601 self.assertFalse(ctx.check_hostname)
1602 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1603
Christian Heimes4df60f12017-09-15 20:26:05 +02001604 def test_context_custom_class(self):
1605 class MySSLSocket(ssl.SSLSocket):
1606 pass
1607
1608 class MySSLObject(ssl.SSLObject):
1609 pass
1610
1611 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1612 ctx.sslsocket_class = MySSLSocket
1613 ctx.sslobject_class = MySSLObject
1614
1615 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1616 self.assertIsInstance(sock, MySSLSocket)
1617 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1618 self.assertIsInstance(obj, MySSLObject)
1619
Antoine Pitrou152efa22010-05-16 18:19:27 +00001620
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001621class SSLErrorTests(unittest.TestCase):
1622
1623 def test_str(self):
1624 # The str() of a SSLError doesn't include the errno
1625 e = ssl.SSLError(1, "foo")
1626 self.assertEqual(str(e), "foo")
1627 self.assertEqual(e.errno, 1)
1628 # Same for a subclass
1629 e = ssl.SSLZeroReturnError(1, "foo")
1630 self.assertEqual(str(e), "foo")
1631 self.assertEqual(e.errno, 1)
1632
1633 def test_lib_reason(self):
1634 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001635 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001636 with self.assertRaises(ssl.SSLError) as cm:
1637 ctx.load_dh_params(CERTFILE)
1638 self.assertEqual(cm.exception.library, 'PEM')
1639 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1640 s = str(cm.exception)
1641 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1642
1643 def test_subclass(self):
1644 # Check that the appropriate SSLError subclass is raised
1645 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001646 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1647 ctx.check_hostname = False
1648 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001649 with socket.socket() as s:
1650 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001651 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001652 c = socket.socket()
1653 c.connect(s.getsockname())
1654 c.setblocking(False)
1655 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001656 with self.assertRaises(ssl.SSLWantReadError) as cm:
1657 c.do_handshake()
1658 s = str(cm.exception)
1659 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1660 # For compatibility
1661 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1662
1663
Christian Heimes61d478c2018-01-27 15:51:38 +01001664 def test_bad_server_hostname(self):
1665 ctx = ssl.create_default_context()
1666 with self.assertRaises(ValueError):
1667 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1668 server_hostname="")
1669 with self.assertRaises(ValueError):
1670 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1671 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001672 with self.assertRaises(TypeError):
1673 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1674 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001675
1676
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001677class MemoryBIOTests(unittest.TestCase):
1678
1679 def test_read_write(self):
1680 bio = ssl.MemoryBIO()
1681 bio.write(b'foo')
1682 self.assertEqual(bio.read(), b'foo')
1683 self.assertEqual(bio.read(), b'')
1684 bio.write(b'foo')
1685 bio.write(b'bar')
1686 self.assertEqual(bio.read(), b'foobar')
1687 self.assertEqual(bio.read(), b'')
1688 bio.write(b'baz')
1689 self.assertEqual(bio.read(2), b'ba')
1690 self.assertEqual(bio.read(1), b'z')
1691 self.assertEqual(bio.read(1), b'')
1692
1693 def test_eof(self):
1694 bio = ssl.MemoryBIO()
1695 self.assertFalse(bio.eof)
1696 self.assertEqual(bio.read(), b'')
1697 self.assertFalse(bio.eof)
1698 bio.write(b'foo')
1699 self.assertFalse(bio.eof)
1700 bio.write_eof()
1701 self.assertFalse(bio.eof)
1702 self.assertEqual(bio.read(2), b'fo')
1703 self.assertFalse(bio.eof)
1704 self.assertEqual(bio.read(1), b'o')
1705 self.assertTrue(bio.eof)
1706 self.assertEqual(bio.read(), b'')
1707 self.assertTrue(bio.eof)
1708
1709 def test_pending(self):
1710 bio = ssl.MemoryBIO()
1711 self.assertEqual(bio.pending, 0)
1712 bio.write(b'foo')
1713 self.assertEqual(bio.pending, 3)
1714 for i in range(3):
1715 bio.read(1)
1716 self.assertEqual(bio.pending, 3-i-1)
1717 for i in range(3):
1718 bio.write(b'x')
1719 self.assertEqual(bio.pending, i+1)
1720 bio.read()
1721 self.assertEqual(bio.pending, 0)
1722
1723 def test_buffer_types(self):
1724 bio = ssl.MemoryBIO()
1725 bio.write(b'foo')
1726 self.assertEqual(bio.read(), b'foo')
1727 bio.write(bytearray(b'bar'))
1728 self.assertEqual(bio.read(), b'bar')
1729 bio.write(memoryview(b'baz'))
1730 self.assertEqual(bio.read(), b'baz')
1731
1732 def test_error_types(self):
1733 bio = ssl.MemoryBIO()
1734 self.assertRaises(TypeError, bio.write, 'foo')
1735 self.assertRaises(TypeError, bio.write, None)
1736 self.assertRaises(TypeError, bio.write, True)
1737 self.assertRaises(TypeError, bio.write, 1)
1738
1739
Christian Heimes9d50ab52018-02-27 10:17:30 +01001740class SSLObjectTests(unittest.TestCase):
1741 def test_private_init(self):
1742 bio = ssl.MemoryBIO()
1743 with self.assertRaisesRegex(TypeError, "public constructor"):
1744 ssl.SSLObject(bio, bio)
1745
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001746 def test_unwrap(self):
1747 client_ctx, server_ctx, hostname = testing_context()
1748 c_in = ssl.MemoryBIO()
1749 c_out = ssl.MemoryBIO()
1750 s_in = ssl.MemoryBIO()
1751 s_out = ssl.MemoryBIO()
1752 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1753 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1754
1755 # Loop on the handshake for a bit to get it settled
1756 for _ in range(5):
1757 try:
1758 client.do_handshake()
1759 except ssl.SSLWantReadError:
1760 pass
1761 if c_out.pending:
1762 s_in.write(c_out.read())
1763 try:
1764 server.do_handshake()
1765 except ssl.SSLWantReadError:
1766 pass
1767 if s_out.pending:
1768 c_in.write(s_out.read())
1769 # Now the handshakes should be complete (don't raise WantReadError)
1770 client.do_handshake()
1771 server.do_handshake()
1772
1773 # Now if we unwrap one side unilaterally, it should send close-notify
1774 # and raise WantReadError:
1775 with self.assertRaises(ssl.SSLWantReadError):
1776 client.unwrap()
1777
1778 # But server.unwrap() does not raise, because it reads the client's
1779 # close-notify:
1780 s_in.write(c_out.read())
1781 server.unwrap()
1782
1783 # And now that the client gets the server's close-notify, it doesn't
1784 # raise either.
1785 c_in.write(s_out.read())
1786 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001787
Martin Panter3840b2a2016-03-27 01:53:46 +00001788class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001789 """Tests that connect to a simple server running in the background"""
1790
1791 def setUp(self):
1792 server = ThreadedEchoServer(SIGNED_CERTFILE)
1793 self.server_addr = (HOST, server.port)
1794 server.__enter__()
1795 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001796
Antoine Pitrou480a1242010-04-28 21:37:09 +00001797 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001798 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001799 cert_reqs=ssl.CERT_NONE) as s:
1800 s.connect(self.server_addr)
1801 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001802 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001803
Martin Panter3840b2a2016-03-27 01:53:46 +00001804 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001805 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001806 cert_reqs=ssl.CERT_REQUIRED,
1807 ca_certs=SIGNING_CA) as s:
1808 s.connect(self.server_addr)
1809 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001810 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001811
Martin Panter3840b2a2016-03-27 01:53:46 +00001812 def test_connect_fail(self):
1813 # This should fail because we have no verification certs. Connection
1814 # failure crashes ThreadedEchoServer, so run this in an independent
1815 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001816 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001817 cert_reqs=ssl.CERT_REQUIRED)
1818 self.addCleanup(s.close)
1819 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1820 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001821
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001822 def test_connect_ex(self):
1823 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001824 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001825 cert_reqs=ssl.CERT_REQUIRED,
1826 ca_certs=SIGNING_CA)
1827 self.addCleanup(s.close)
1828 self.assertEqual(0, s.connect_ex(self.server_addr))
1829 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001830
1831 def test_non_blocking_connect_ex(self):
1832 # Issue #11326: non-blocking connect_ex() should allow handshake
1833 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001834 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001835 cert_reqs=ssl.CERT_REQUIRED,
1836 ca_certs=SIGNING_CA,
1837 do_handshake_on_connect=False)
1838 self.addCleanup(s.close)
1839 s.setblocking(False)
1840 rc = s.connect_ex(self.server_addr)
1841 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1842 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1843 # Wait for connect to finish
1844 select.select([], [s], [], 5.0)
1845 # Non-blocking handshake
1846 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001847 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001848 s.do_handshake()
1849 break
1850 except ssl.SSLWantReadError:
1851 select.select([s], [], [], 5.0)
1852 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001853 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001854 # SSL established
1855 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001856
Antoine Pitrou152efa22010-05-16 18:19:27 +00001857 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001858 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001859 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001860 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1861 s.connect(self.server_addr)
1862 self.assertEqual({}, s.getpeercert())
1863 # Same with a server hostname
1864 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1865 server_hostname="dummy") as s:
1866 s.connect(self.server_addr)
1867 ctx.verify_mode = ssl.CERT_REQUIRED
1868 # This should succeed because we specify the root cert
1869 ctx.load_verify_locations(SIGNING_CA)
1870 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1871 s.connect(self.server_addr)
1872 cert = s.getpeercert()
1873 self.assertTrue(cert)
1874
1875 def test_connect_with_context_fail(self):
1876 # This should fail because we have no verification certs. Connection
1877 # failure crashes ThreadedEchoServer, so run this in an independent
1878 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001879 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001880 ctx.verify_mode = ssl.CERT_REQUIRED
1881 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1882 self.addCleanup(s.close)
1883 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1884 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001885
1886 def test_connect_capath(self):
1887 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001888 # NOTE: the subject hashing algorithm has been changed between
1889 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1890 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001891 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001892 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001893 ctx.verify_mode = ssl.CERT_REQUIRED
1894 ctx.load_verify_locations(capath=CAPATH)
1895 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1896 s.connect(self.server_addr)
1897 cert = s.getpeercert()
1898 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001899
Martin Panter3840b2a2016-03-27 01:53:46 +00001900 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001901 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001902 ctx.verify_mode = ssl.CERT_REQUIRED
1903 ctx.load_verify_locations(capath=BYTES_CAPATH)
1904 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1905 s.connect(self.server_addr)
1906 cert = s.getpeercert()
1907 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001908
Christian Heimesefff7062013-11-21 03:35:02 +01001909 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001910 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001911 pem = f.read()
1912 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001913 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001914 ctx.verify_mode = ssl.CERT_REQUIRED
1915 ctx.load_verify_locations(cadata=pem)
1916 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1917 s.connect(self.server_addr)
1918 cert = s.getpeercert()
1919 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001920
Martin Panter3840b2a2016-03-27 01:53:46 +00001921 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001922 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001923 ctx.verify_mode = ssl.CERT_REQUIRED
1924 ctx.load_verify_locations(cadata=der)
1925 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1926 s.connect(self.server_addr)
1927 cert = s.getpeercert()
1928 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001929
Antoine Pitroue3220242010-04-24 11:13:53 +00001930 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1931 def test_makefile_close(self):
1932 # Issue #5238: creating a file-like object with makefile() shouldn't
1933 # delay closing the underlying "real socket" (here tested with its
1934 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001935 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001936 ss.connect(self.server_addr)
1937 fd = ss.fileno()
1938 f = ss.makefile()
1939 f.close()
1940 # The fd is still open
1941 os.read(fd, 0)
1942 # Closing the SSL socket should close the fd too
1943 ss.close()
1944 gc.collect()
1945 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001946 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001947 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001948
Antoine Pitrou480a1242010-04-28 21:37:09 +00001949 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001950 s = socket.socket(socket.AF_INET)
1951 s.connect(self.server_addr)
1952 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001953 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001954 cert_reqs=ssl.CERT_NONE,
1955 do_handshake_on_connect=False)
1956 self.addCleanup(s.close)
1957 count = 0
1958 while True:
1959 try:
1960 count += 1
1961 s.do_handshake()
1962 break
1963 except ssl.SSLWantReadError:
1964 select.select([s], [], [])
1965 except ssl.SSLWantWriteError:
1966 select.select([], [s], [])
1967 if support.verbose:
1968 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001969
Antoine Pitrou480a1242010-04-28 21:37:09 +00001970 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001971 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001972
Martin Panter3840b2a2016-03-27 01:53:46 +00001973 def test_get_server_certificate_fail(self):
1974 # Connection failure crashes ThreadedEchoServer, so run this in an
1975 # independent test method
1976 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001977
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001978 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001979 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001980 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1981 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001982 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001983 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1984 s.connect(self.server_addr)
1985 # Error checking can happen at instantiation or when connecting
1986 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1987 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001988 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001989 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1990 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001991
Christian Heimes9a5395a2013-06-17 15:44:12 +02001992 def test_get_ca_certs_capath(self):
1993 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001995 ctx.load_verify_locations(capath=CAPATH)
1996 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001997 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1998 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001999 s.connect(self.server_addr)
2000 cert = s.getpeercert()
2001 self.assertTrue(cert)
2002 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02002003
Christian Heimes575596e2013-12-15 21:49:17 +01002004 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01002005 def test_context_setget(self):
2006 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02002007 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2008 ctx1.load_verify_locations(capath=CAPATH)
2009 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2010 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00002011 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02002012 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00002013 ss.connect(self.server_addr)
2014 self.assertIs(ss.context, ctx1)
2015 self.assertIs(ss._sslobj.context, ctx1)
2016 ss.context = ctx2
2017 self.assertIs(ss.context, ctx2)
2018 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002019
2020 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
2021 # A simple IO loop. Call func(*args) depending on the error we get
2022 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
2023 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07002024 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002025 count = 0
2026 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002027 if time.monotonic() > deadline:
2028 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002029 errno = None
2030 count += 1
2031 try:
2032 ret = func(*args)
2033 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002034 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002035 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002036 raise
2037 errno = e.errno
2038 # Get any data from the outgoing BIO irrespective of any error, and
2039 # send it to the socket.
2040 buf = outgoing.read()
2041 sock.sendall(buf)
2042 # If there's no error, we're done. For WANT_READ, we need to get
2043 # data from the socket and put it in the incoming BIO.
2044 if errno is None:
2045 break
2046 elif errno == ssl.SSL_ERROR_WANT_READ:
2047 buf = sock.recv(32768)
2048 if buf:
2049 incoming.write(buf)
2050 else:
2051 incoming.write_eof()
2052 if support.verbose:
2053 sys.stdout.write("Needed %d calls to complete %s().\n"
2054 % (count, func.__name__))
2055 return ret
2056
Martin Panter3840b2a2016-03-27 01:53:46 +00002057 def test_bio_handshake(self):
2058 sock = socket.socket(socket.AF_INET)
2059 self.addCleanup(sock.close)
2060 sock.connect(self.server_addr)
2061 incoming = ssl.MemoryBIO()
2062 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002063 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2064 self.assertTrue(ctx.check_hostname)
2065 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002066 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002067 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2068 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002069 self.assertIs(sslobj._sslobj.owner, sslobj)
2070 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002071 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002072 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002073 self.assertRaises(ValueError, sslobj.getpeercert)
2074 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2075 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2076 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2077 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002078 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002079 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002080 self.assertTrue(sslobj.getpeercert())
2081 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2082 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2083 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002084 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002085 except ssl.SSLSyscallError:
2086 # If the server shuts down the TCP connection without sending a
2087 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2088 pass
2089 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2090
2091 def test_bio_read_write_data(self):
2092 sock = socket.socket(socket.AF_INET)
2093 self.addCleanup(sock.close)
2094 sock.connect(self.server_addr)
2095 incoming = ssl.MemoryBIO()
2096 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002097 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002098 ctx.verify_mode = ssl.CERT_NONE
2099 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2100 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2101 req = b'FOO\n'
2102 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2103 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2104 self.assertEqual(buf, b'foo\n')
2105 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002106
2107
Martin Panter3840b2a2016-03-27 01:53:46 +00002108class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002109
Martin Panter3840b2a2016-03-27 01:53:46 +00002110 def test_timeout_connect_ex(self):
2111 # Issue #12065: on a timeout, connect_ex() should return the original
2112 # errno (mimicking the behaviour of non-SSL sockets).
2113 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002114 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002115 cert_reqs=ssl.CERT_REQUIRED,
2116 do_handshake_on_connect=False)
2117 self.addCleanup(s.close)
2118 s.settimeout(0.0000001)
2119 rc = s.connect_ex((REMOTE_HOST, 443))
2120 if rc == 0:
2121 self.skipTest("REMOTE_HOST responded too quickly")
2122 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2123
2124 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2125 def test_get_server_certificate_ipv6(self):
2126 with support.transient_internet('ipv6.google.com'):
2127 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2128 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2129
Martin Panter3840b2a2016-03-27 01:53:46 +00002130
2131def _test_get_server_certificate(test, host, port, cert=None):
2132 pem = ssl.get_server_certificate((host, port))
2133 if not pem:
2134 test.fail("No server certificate on %s:%s!" % (host, port))
2135
2136 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2137 if not pem:
2138 test.fail("No server certificate on %s:%s!" % (host, port))
2139 if support.verbose:
2140 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2141
2142def _test_get_server_certificate_fail(test, host, port):
2143 try:
2144 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2145 except ssl.SSLError as x:
2146 #should fail
2147 if support.verbose:
2148 sys.stdout.write("%s\n" % x)
2149 else:
2150 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2151
2152
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002153from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002154
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002155class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002156
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002157 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002158
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002159 """A mildly complicated class, because we want it to work both
2160 with and without the SSL wrapper around the socket connection, so
2161 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002162
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002163 def __init__(self, server, connsock, addr):
2164 self.server = server
2165 self.running = False
2166 self.sock = connsock
2167 self.addr = addr
2168 self.sock.setblocking(1)
2169 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002170 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002171 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002172
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002173 def wrap_conn(self):
2174 try:
2175 self.sslconn = self.server.context.wrap_socket(
2176 self.sock, server_side=True)
2177 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2178 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Christian Heimes529525f2018-05-23 22:24:45 +02002179 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002180 # We treat ConnectionResetError as though it were an
2181 # SSLError - OpenSSL on Ubuntu abruptly closes the
2182 # connection when asked to use an unsupported protocol.
2183 #
Christian Heimes529525f2018-05-23 22:24:45 +02002184 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2185 # tries to send session tickets after handshake.
2186 # https://github.com/openssl/openssl/issues/6342
2187 self.server.conn_errors.append(str(e))
2188 if self.server.chatty:
2189 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2190 self.running = False
2191 self.close()
2192 return False
2193 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002194 # OSError may occur with wrong protocols, e.g. both
2195 # sides use PROTOCOL_TLS_SERVER.
2196 #
2197 # XXX Various errors can have happened here, for example
2198 # a mismatching protocol version, an invalid certificate,
2199 # or a low-level bug. This should be made more discriminating.
2200 #
2201 # bpo-31323: Store the exception as string to prevent
2202 # a reference leak: server -> conn_errors -> exception
2203 # -> traceback -> self (ConnectionHandler) -> server
2204 self.server.conn_errors.append(str(e))
2205 if self.server.chatty:
2206 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2207 self.running = False
2208 self.server.stop()
2209 self.close()
2210 return False
2211 else:
2212 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2213 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2214 cert = self.sslconn.getpeercert()
2215 if support.verbose and self.server.chatty:
2216 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2217 cert_binary = self.sslconn.getpeercert(True)
2218 if support.verbose and self.server.chatty:
2219 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2220 cipher = self.sslconn.cipher()
2221 if support.verbose and self.server.chatty:
2222 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2223 sys.stdout.write(" server: selected protocol is now "
2224 + str(self.sslconn.selected_npn_protocol()) + "\n")
2225 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002226
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002227 def read(self):
2228 if self.sslconn:
2229 return self.sslconn.read()
2230 else:
2231 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002232
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002233 def write(self, bytes):
2234 if self.sslconn:
2235 return self.sslconn.write(bytes)
2236 else:
2237 return self.sock.send(bytes)
2238
2239 def close(self):
2240 if self.sslconn:
2241 self.sslconn.close()
2242 else:
2243 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002244
Antoine Pitrou480a1242010-04-28 21:37:09 +00002245 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002246 self.running = True
2247 if not self.server.starttls_server:
2248 if not self.wrap_conn():
2249 return
2250 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002251 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002252 msg = self.read()
2253 stripped = msg.strip()
2254 if not stripped:
2255 # eof, so quit this handler
2256 self.running = False
2257 try:
2258 self.sock = self.sslconn.unwrap()
2259 except OSError:
2260 # Many tests shut the TCP connection down
2261 # without an SSL shutdown. This causes
2262 # unwrap() to raise OSError with errno=0!
2263 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002264 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002265 self.sslconn = None
2266 self.close()
2267 elif stripped == b'over':
2268 if support.verbose and self.server.connectionchatty:
2269 sys.stdout.write(" server: client closed connection\n")
2270 self.close()
2271 return
2272 elif (self.server.starttls_server and
2273 stripped == b'STARTTLS'):
2274 if support.verbose and self.server.connectionchatty:
2275 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2276 self.write(b"OK\n")
2277 if not self.wrap_conn():
2278 return
2279 elif (self.server.starttls_server and self.sslconn
2280 and stripped == b'ENDTLS'):
2281 if support.verbose and self.server.connectionchatty:
2282 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2283 self.write(b"OK\n")
2284 self.sock = self.sslconn.unwrap()
2285 self.sslconn = None
2286 if support.verbose and self.server.connectionchatty:
2287 sys.stdout.write(" server: connection is now unencrypted...\n")
2288 elif stripped == b'CB tls-unique':
2289 if support.verbose and self.server.connectionchatty:
2290 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2291 data = self.sslconn.get_channel_binding("tls-unique")
2292 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002293 elif stripped == b'PHA':
2294 if support.verbose and self.server.connectionchatty:
2295 sys.stdout.write(" server: initiating post handshake auth\n")
2296 try:
2297 self.sslconn.verify_client_post_handshake()
2298 except ssl.SSLError as e:
2299 self.write(repr(e).encode("us-ascii") + b"\n")
2300 else:
2301 self.write(b"OK\n")
2302 elif stripped == b'HASCERT':
2303 if self.sslconn.getpeercert() is not None:
2304 self.write(b'TRUE\n')
2305 else:
2306 self.write(b'FALSE\n')
2307 elif stripped == b'GETCERT':
2308 cert = self.sslconn.getpeercert()
2309 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002310 else:
2311 if (support.verbose and
2312 self.server.connectionchatty):
2313 ctype = (self.sslconn and "encrypted") or "unencrypted"
2314 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2315 % (msg, ctype, msg.lower(), ctype))
2316 self.write(msg.lower())
Christian Heimes529525f2018-05-23 22:24:45 +02002317 except ConnectionResetError:
2318 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2319 # when connection is not shut down gracefully.
2320 if self.server.chatty and support.verbose:
2321 sys.stdout.write(
2322 " Connection reset by peer: {}\n".format(
2323 self.addr)
2324 )
2325 self.close()
2326 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002327 except OSError:
2328 if self.server.chatty:
2329 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002330 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002331 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002332
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002333 # normally, we'd just stop here, but for the test
2334 # harness, we want to stop the server
2335 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002336
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002337 def __init__(self, certificate=None, ssl_version=None,
2338 certreqs=None, cacerts=None,
2339 chatty=True, connectionchatty=False, starttls_server=False,
2340 npn_protocols=None, alpn_protocols=None,
2341 ciphers=None, context=None):
2342 if context:
2343 self.context = context
2344 else:
2345 self.context = ssl.SSLContext(ssl_version
2346 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002347 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002348 self.context.verify_mode = (certreqs if certreqs is not None
2349 else ssl.CERT_NONE)
2350 if cacerts:
2351 self.context.load_verify_locations(cacerts)
2352 if certificate:
2353 self.context.load_cert_chain(certificate)
2354 if npn_protocols:
2355 self.context.set_npn_protocols(npn_protocols)
2356 if alpn_protocols:
2357 self.context.set_alpn_protocols(alpn_protocols)
2358 if ciphers:
2359 self.context.set_ciphers(ciphers)
2360 self.chatty = chatty
2361 self.connectionchatty = connectionchatty
2362 self.starttls_server = starttls_server
2363 self.sock = socket.socket()
2364 self.port = support.bind_port(self.sock)
2365 self.flag = None
2366 self.active = False
2367 self.selected_npn_protocols = []
2368 self.selected_alpn_protocols = []
2369 self.shared_ciphers = []
2370 self.conn_errors = []
2371 threading.Thread.__init__(self)
2372 self.daemon = True
2373
2374 def __enter__(self):
2375 self.start(threading.Event())
2376 self.flag.wait()
2377 return self
2378
2379 def __exit__(self, *args):
2380 self.stop()
2381 self.join()
2382
2383 def start(self, flag=None):
2384 self.flag = flag
2385 threading.Thread.start(self)
2386
2387 def run(self):
2388 self.sock.settimeout(0.05)
2389 self.sock.listen()
2390 self.active = True
2391 if self.flag:
2392 # signal an event
2393 self.flag.set()
2394 while self.active:
2395 try:
2396 newconn, connaddr = self.sock.accept()
2397 if support.verbose and self.chatty:
2398 sys.stdout.write(' server: new connection from '
2399 + repr(connaddr) + '\n')
2400 handler = self.ConnectionHandler(self, newconn, connaddr)
2401 handler.start()
2402 handler.join()
2403 except socket.timeout:
2404 pass
2405 except KeyboardInterrupt:
2406 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002407 except BaseException as e:
2408 if support.verbose and self.chatty:
2409 sys.stdout.write(
2410 ' connection handling failed: ' + repr(e) + '\n')
2411
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002412 self.sock.close()
2413
2414 def stop(self):
2415 self.active = False
2416
2417class AsyncoreEchoServer(threading.Thread):
2418
2419 # this one's based on asyncore.dispatcher
2420
2421 class EchoServer (asyncore.dispatcher):
2422
2423 class ConnectionHandler(asyncore.dispatcher_with_send):
2424
2425 def __init__(self, conn, certfile):
2426 self.socket = test_wrap_socket(conn, server_side=True,
2427 certfile=certfile,
2428 do_handshake_on_connect=False)
2429 asyncore.dispatcher_with_send.__init__(self, self.socket)
2430 self._ssl_accepting = True
2431 self._do_ssl_handshake()
2432
2433 def readable(self):
2434 if isinstance(self.socket, ssl.SSLSocket):
2435 while self.socket.pending() > 0:
2436 self.handle_read_event()
2437 return True
2438
2439 def _do_ssl_handshake(self):
2440 try:
2441 self.socket.do_handshake()
2442 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2443 return
2444 except ssl.SSLEOFError:
2445 return self.handle_close()
2446 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002447 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002448 except OSError as err:
2449 if err.args[0] == errno.ECONNABORTED:
2450 return self.handle_close()
2451 else:
2452 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002453
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002454 def handle_read(self):
2455 if self._ssl_accepting:
2456 self._do_ssl_handshake()
2457 else:
2458 data = self.recv(1024)
2459 if support.verbose:
2460 sys.stdout.write(" server: read %s from client\n" % repr(data))
2461 if not data:
2462 self.close()
2463 else:
2464 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002465
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002466 def handle_close(self):
2467 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002468 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002469 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002470
2471 def handle_error(self):
2472 raise
2473
Trent Nelson78520002008-04-10 20:54:35 +00002474 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 self.certfile = certfile
2476 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2477 self.port = support.bind_port(sock, '')
2478 asyncore.dispatcher.__init__(self, sock)
2479 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002480
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002481 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002482 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002483 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2484 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002485
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002486 def handle_error(self):
2487 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002488
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002489 def __init__(self, certfile):
2490 self.flag = None
2491 self.active = False
2492 self.server = self.EchoServer(certfile)
2493 self.port = self.server.port
2494 threading.Thread.__init__(self)
2495 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002496
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002497 def __str__(self):
2498 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002499
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002500 def __enter__(self):
2501 self.start(threading.Event())
2502 self.flag.wait()
2503 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002504
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002505 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002506 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002507 sys.stdout.write(" cleanup: stopping server.\n")
2508 self.stop()
2509 if support.verbose:
2510 sys.stdout.write(" cleanup: joining server thread.\n")
2511 self.join()
2512 if support.verbose:
2513 sys.stdout.write(" cleanup: successfully joined.\n")
2514 # make sure that ConnectionHandler is removed from socket_map
2515 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002516
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002517 def start (self, flag=None):
2518 self.flag = flag
2519 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002520
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002521 def run(self):
2522 self.active = True
2523 if self.flag:
2524 self.flag.set()
2525 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002526 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 asyncore.loop(1)
2528 except:
2529 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002530
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002531 def stop(self):
2532 self.active = False
2533 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002534
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002535def server_params_test(client_context, server_context, indata=b"FOO\n",
2536 chatty=True, connectionchatty=False, sni_name=None,
2537 session=None):
2538 """
2539 Launch a server, connect a client to it and try various reads
2540 and writes.
2541 """
2542 stats = {}
2543 server = ThreadedEchoServer(context=server_context,
2544 chatty=chatty,
2545 connectionchatty=False)
2546 with server:
2547 with client_context.wrap_socket(socket.socket(),
2548 server_hostname=sni_name, session=session) as s:
2549 s.connect((HOST, server.port))
2550 for arg in [indata, bytearray(indata), memoryview(indata)]:
2551 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002552 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002553 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002554 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002555 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002556 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557 if connectionchatty:
2558 if support.verbose:
2559 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002560 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002561 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002562 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2563 % (outdata[:20], len(outdata),
2564 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002565 s.write(b"over\n")
2566 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002567 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002568 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002569 stats.update({
2570 'compression': s.compression(),
2571 'cipher': s.cipher(),
2572 'peercert': s.getpeercert(),
2573 'client_alpn_protocol': s.selected_alpn_protocol(),
2574 'client_npn_protocol': s.selected_npn_protocol(),
2575 'version': s.version(),
2576 'session_reused': s.session_reused,
2577 'session': s.session,
2578 })
2579 s.close()
2580 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2581 stats['server_npn_protocols'] = server.selected_npn_protocols
2582 stats['server_shared_ciphers'] = server.shared_ciphers
2583 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002584
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002585def try_protocol_combo(server_protocol, client_protocol, expect_success,
2586 certsreqs=None, server_options=0, client_options=0):
2587 """
2588 Try to SSL-connect using *client_protocol* to *server_protocol*.
2589 If *expect_success* is true, assert that the connection succeeds,
2590 if it's false, assert that the connection fails.
2591 Also, if *expect_success* is a string, assert that it is the protocol
2592 version actually used by the connection.
2593 """
2594 if certsreqs is None:
2595 certsreqs = ssl.CERT_NONE
2596 certtype = {
2597 ssl.CERT_NONE: "CERT_NONE",
2598 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2599 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2600 }[certsreqs]
2601 if support.verbose:
2602 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2603 sys.stdout.write(formatstr %
2604 (ssl.get_protocol_name(client_protocol),
2605 ssl.get_protocol_name(server_protocol),
2606 certtype))
2607 client_context = ssl.SSLContext(client_protocol)
2608 client_context.options |= client_options
2609 server_context = ssl.SSLContext(server_protocol)
2610 server_context.options |= server_options
2611
2612 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2613 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2614 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002615 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002616 client_context.set_ciphers("ALL")
2617
2618 for ctx in (client_context, server_context):
2619 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002620 ctx.load_cert_chain(SIGNED_CERTFILE)
2621 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002622 try:
2623 stats = server_params_test(client_context, server_context,
2624 chatty=False, connectionchatty=False)
2625 # Protocol mismatch can result in either an SSLError, or a
2626 # "Connection reset by peer" error.
2627 except ssl.SSLError:
2628 if expect_success:
2629 raise
2630 except OSError as e:
2631 if expect_success or e.errno != errno.ECONNRESET:
2632 raise
2633 else:
2634 if not expect_success:
2635 raise AssertionError(
2636 "Client protocol %s succeeded with server protocol %s!"
2637 % (ssl.get_protocol_name(client_protocol),
2638 ssl.get_protocol_name(server_protocol)))
2639 elif (expect_success is not True
2640 and expect_success != stats['version']):
2641 raise AssertionError("version mismatch: expected %r, got %r"
2642 % (expect_success, stats['version']))
2643
2644
2645class ThreadedTests(unittest.TestCase):
2646
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002647 def test_echo(self):
2648 """Basic test of an SSL client connecting to a server"""
2649 if support.verbose:
2650 sys.stdout.write("\n")
2651 for protocol in PROTOCOLS:
2652 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2653 continue
2654 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2655 context = ssl.SSLContext(protocol)
2656 context.load_cert_chain(CERTFILE)
2657 server_params_test(context, context,
2658 chatty=True, connectionchatty=True)
2659
Christian Heimesa170fa12017-09-15 20:27:30 +02002660 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002661
2662 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2663 server_params_test(client_context=client_context,
2664 server_context=server_context,
2665 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002666 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002667
2668 client_context.check_hostname = False
2669 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2670 with self.assertRaises(ssl.SSLError) as e:
2671 server_params_test(client_context=server_context,
2672 server_context=client_context,
2673 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002674 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675 self.assertIn('called a function you should not call',
2676 str(e.exception))
2677
2678 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2679 with self.assertRaises(ssl.SSLError) as e:
2680 server_params_test(client_context=server_context,
2681 server_context=server_context,
2682 chatty=True, connectionchatty=True)
2683 self.assertIn('called a function you should not call',
2684 str(e.exception))
2685
2686 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2687 with self.assertRaises(ssl.SSLError) as e:
2688 server_params_test(client_context=server_context,
2689 server_context=client_context,
2690 chatty=True, connectionchatty=True)
2691 self.assertIn('called a function you should not call',
2692 str(e.exception))
2693
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002694 def test_getpeercert(self):
2695 if support.verbose:
2696 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002697
2698 client_context, server_context, hostname = testing_context()
2699 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002700 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002701 with client_context.wrap_socket(socket.socket(),
2702 do_handshake_on_connect=False,
2703 server_hostname=hostname) as s:
2704 s.connect((HOST, server.port))
2705 # getpeercert() raise ValueError while the handshake isn't
2706 # done.
2707 with self.assertRaises(ValueError):
2708 s.getpeercert()
2709 s.do_handshake()
2710 cert = s.getpeercert()
2711 self.assertTrue(cert, "Can't get peer certificate.")
2712 cipher = s.cipher()
2713 if support.verbose:
2714 sys.stdout.write(pprint.pformat(cert) + '\n')
2715 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2716 if 'subject' not in cert:
2717 self.fail("No subject field in certificate: %s." %
2718 pprint.pformat(cert))
2719 if ((('organizationName', 'Python Software Foundation'),)
2720 not in cert['subject']):
2721 self.fail(
2722 "Missing or invalid 'organizationName' field in certificate subject; "
2723 "should be 'Python Software Foundation'.")
2724 self.assertIn('notBefore', cert)
2725 self.assertIn('notAfter', cert)
2726 before = ssl.cert_time_to_seconds(cert['notBefore'])
2727 after = ssl.cert_time_to_seconds(cert['notAfter'])
2728 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002729
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002730 @unittest.skipUnless(have_verify_flags(),
2731 "verify_flags need OpenSSL > 0.9.8")
2732 def test_crl_check(self):
2733 if support.verbose:
2734 sys.stdout.write("\n")
2735
Christian Heimesa170fa12017-09-15 20:27:30 +02002736 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002737
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002738 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002739 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002740
2741 # VERIFY_DEFAULT should pass
2742 server = ThreadedEchoServer(context=server_context, chatty=True)
2743 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002744 with client_context.wrap_socket(socket.socket(),
2745 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002746 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002747 cert = s.getpeercert()
2748 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002749
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002750 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002751 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002752
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002753 server = ThreadedEchoServer(context=server_context, chatty=True)
2754 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002755 with client_context.wrap_socket(socket.socket(),
2756 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002757 with self.assertRaisesRegex(ssl.SSLError,
2758 "certificate verify failed"):
2759 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002760
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002761 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002762 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002763
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002764 server = ThreadedEchoServer(context=server_context, chatty=True)
2765 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002766 with client_context.wrap_socket(socket.socket(),
2767 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002768 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002769 cert = s.getpeercert()
2770 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002771
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002772 def test_check_hostname(self):
2773 if support.verbose:
2774 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002775
Christian Heimesa170fa12017-09-15 20:27:30 +02002776 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002777
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778 # correct hostname should verify
2779 server = ThreadedEchoServer(context=server_context, chatty=True)
2780 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002781 with client_context.wrap_socket(socket.socket(),
2782 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002783 s.connect((HOST, server.port))
2784 cert = s.getpeercert()
2785 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002786
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002787 # incorrect hostname should raise an exception
2788 server = ThreadedEchoServer(context=server_context, chatty=True)
2789 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002790 with client_context.wrap_socket(socket.socket(),
2791 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002792 with self.assertRaisesRegex(
2793 ssl.CertificateError,
2794 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002795 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002796
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002797 # missing server_hostname arg should cause an exception, too
2798 server = ThreadedEchoServer(context=server_context, chatty=True)
2799 with server:
2800 with socket.socket() as s:
2801 with self.assertRaisesRegex(ValueError,
2802 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002803 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002804
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002805 def test_ecc_cert(self):
2806 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2807 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002808 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002809 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2810
2811 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2812 # load ECC cert
2813 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2814
2815 # correct hostname should verify
2816 server = ThreadedEchoServer(context=server_context, chatty=True)
2817 with server:
2818 with client_context.wrap_socket(socket.socket(),
2819 server_hostname=hostname) as s:
2820 s.connect((HOST, server.port))
2821 cert = s.getpeercert()
2822 self.assertTrue(cert, "Can't get peer certificate.")
2823 cipher = s.cipher()[0].split('-')
2824 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2825
2826 def test_dual_rsa_ecc(self):
2827 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2828 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002829 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2830 # algorithms.
2831 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002832 # only ECDSA certs
2833 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2834 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2835
2836 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2837 # load ECC and RSA key/cert pairs
2838 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2839 server_context.load_cert_chain(SIGNED_CERTFILE)
2840
2841 # correct hostname should verify
2842 server = ThreadedEchoServer(context=server_context, chatty=True)
2843 with server:
2844 with client_context.wrap_socket(socket.socket(),
2845 server_hostname=hostname) as s:
2846 s.connect((HOST, server.port))
2847 cert = s.getpeercert()
2848 self.assertTrue(cert, "Can't get peer certificate.")
2849 cipher = s.cipher()[0].split('-')
2850 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2851
Christian Heimes66e57422018-01-29 14:25:13 +01002852 def test_check_hostname_idn(self):
2853 if support.verbose:
2854 sys.stdout.write("\n")
2855
Christian Heimes11a14932018-02-24 02:35:08 +01002856 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002857 server_context.load_cert_chain(IDNSANSFILE)
2858
Christian Heimes11a14932018-02-24 02:35:08 +01002859 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002860 context.verify_mode = ssl.CERT_REQUIRED
2861 context.check_hostname = True
2862 context.load_verify_locations(SIGNING_CA)
2863
2864 # correct hostname should verify, when specified in several
2865 # different ways
2866 idn_hostnames = [
2867 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002868 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002869 ('xn--knig-5qa.idn.pythontest.net',
2870 'xn--knig-5qa.idn.pythontest.net'),
2871 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002872 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002873
2874 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002875 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002876 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2877 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2878 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002879 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2880
2881 # ('königsgäßchen.idna2008.pythontest.net',
2882 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2883 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2884 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2885 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2886 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2887
Christian Heimes66e57422018-01-29 14:25:13 +01002888 ]
2889 for server_hostname, expected_hostname in idn_hostnames:
2890 server = ThreadedEchoServer(context=server_context, chatty=True)
2891 with server:
2892 with context.wrap_socket(socket.socket(),
2893 server_hostname=server_hostname) as s:
2894 self.assertEqual(s.server_hostname, expected_hostname)
2895 s.connect((HOST, server.port))
2896 cert = s.getpeercert()
2897 self.assertEqual(s.server_hostname, expected_hostname)
2898 self.assertTrue(cert, "Can't get peer certificate.")
2899
Christian Heimes66e57422018-01-29 14:25:13 +01002900 # incorrect hostname should raise an exception
2901 server = ThreadedEchoServer(context=server_context, chatty=True)
2902 with server:
2903 with context.wrap_socket(socket.socket(),
2904 server_hostname="python.example.org") as s:
2905 with self.assertRaises(ssl.CertificateError):
2906 s.connect((HOST, server.port))
2907
Christian Heimes529525f2018-05-23 22:24:45 +02002908 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002909 """Connecting when the server rejects the client's certificate
2910
2911 Launch a server with CERT_REQUIRED, and check that trying to
2912 connect to it with a wrong client certificate fails.
2913 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002914 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002915 # load client cert that is not signed by trusted CA
2916 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002917 # require TLS client authentication
2918 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002919 # TLS 1.3 has different handshake
2920 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002921
2922 server = ThreadedEchoServer(
2923 context=server_context, chatty=True, connectionchatty=True,
2924 )
2925
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002926 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002927 client_context.wrap_socket(socket.socket(),
2928 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002929 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002930 # Expect either an SSL error about the server rejecting
2931 # the connection, or a low-level connection reset (which
2932 # sometimes happens on Windows)
2933 s.connect((HOST, server.port))
2934 except ssl.SSLError as e:
2935 if support.verbose:
2936 sys.stdout.write("\nSSLError is %r\n" % e)
2937 except OSError as e:
2938 if e.errno != errno.ECONNRESET:
2939 raise
2940 if support.verbose:
2941 sys.stdout.write("\nsocket.error is %r\n" % e)
2942 else:
2943 self.fail("Use of invalid cert should have failed!")
2944
Christian Heimes529525f2018-05-23 22:24:45 +02002945 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2946 def test_wrong_cert_tls13(self):
2947 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002948 # load client cert that is not signed by trusted CA
2949 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002950 server_context.verify_mode = ssl.CERT_REQUIRED
2951 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2952 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2953
2954 server = ThreadedEchoServer(
2955 context=server_context, chatty=True, connectionchatty=True,
2956 )
2957 with server, \
2958 client_context.wrap_socket(socket.socket(),
2959 server_hostname=hostname) as s:
2960 # TLS 1.3 perform client cert exchange after handshake
2961 s.connect((HOST, server.port))
2962 try:
2963 s.write(b'data')
2964 s.read(4)
2965 except ssl.SSLError as e:
2966 if support.verbose:
2967 sys.stdout.write("\nSSLError is %r\n" % e)
2968 except OSError as e:
2969 if e.errno != errno.ECONNRESET:
2970 raise
2971 if support.verbose:
2972 sys.stdout.write("\nsocket.error is %r\n" % e)
2973 else:
2974 self.fail("Use of invalid cert should have failed!")
2975
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002976 def test_rude_shutdown(self):
2977 """A brutal shutdown of an SSL server should raise an OSError
2978 in the client when attempting handshake.
2979 """
2980 listener_ready = threading.Event()
2981 listener_gone = threading.Event()
2982
2983 s = socket.socket()
2984 port = support.bind_port(s, HOST)
2985
2986 # `listener` runs in a thread. It sits in an accept() until
2987 # the main thread connects. Then it rudely closes the socket,
2988 # and sets Event `listener_gone` to let the main thread know
2989 # the socket is gone.
2990 def listener():
2991 s.listen()
2992 listener_ready.set()
2993 newsock, addr = s.accept()
2994 newsock.close()
2995 s.close()
2996 listener_gone.set()
2997
2998 def connector():
2999 listener_ready.wait()
3000 with socket.socket() as c:
3001 c.connect((HOST, port))
3002 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00003003 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003004 ssl_sock = test_wrap_socket(c)
3005 except OSError:
3006 pass
3007 else:
3008 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00003009
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003010 t = threading.Thread(target=listener)
3011 t.start()
3012 try:
3013 connector()
3014 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003015 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01003016
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003017 def test_ssl_cert_verify_error(self):
3018 if support.verbose:
3019 sys.stdout.write("\n")
3020
3021 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
3022 server_context.load_cert_chain(SIGNED_CERTFILE)
3023
3024 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3025
3026 server = ThreadedEchoServer(context=server_context, chatty=True)
3027 with server:
3028 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003029 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003030 try:
3031 s.connect((HOST, server.port))
3032 except ssl.SSLError as e:
3033 msg = 'unable to get local issuer certificate'
3034 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3035 self.assertEqual(e.verify_code, 20)
3036 self.assertEqual(e.verify_message, msg)
3037 self.assertIn(msg, repr(e))
3038 self.assertIn('certificate verify failed', repr(e))
3039
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003040 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3041 "OpenSSL is compiled without SSLv2 support")
3042 def test_protocol_sslv2(self):
3043 """Connecting to an SSLv2 server with various client options"""
3044 if support.verbose:
3045 sys.stdout.write("\n")
3046 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3047 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3048 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003049 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003050 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3051 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3052 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3053 # SSLv23 client with specific SSL options
3054 if no_sslv2_implies_sslv3_hello():
3055 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003056 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003057 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003058 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003059 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003060 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003061 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003062
Christian Heimesa170fa12017-09-15 20:27:30 +02003063 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003064 """Connecting to an SSLv23 server with various client options"""
3065 if support.verbose:
3066 sys.stdout.write("\n")
3067 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003068 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003069 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070 except OSError as x:
3071 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3072 if support.verbose:
3073 sys.stdout.write(
3074 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3075 % str(x))
3076 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003077 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3078 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3079 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003080
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003081 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003082 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3083 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3084 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085
3086 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003087 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3088 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3089 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003090
3091 # Server with specific SSL options
3092 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003093 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003094 server_options=ssl.OP_NO_SSLv3)
3095 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003096 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003097 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003098 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003099 server_options=ssl.OP_NO_TLSv1)
3100
3101
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003102 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3103 "OpenSSL is compiled without SSLv3 support")
3104 def test_protocol_sslv3(self):
3105 """Connecting to an SSLv3 server with various client options"""
3106 if support.verbose:
3107 sys.stdout.write("\n")
3108 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3109 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3110 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3111 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3112 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003113 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003114 client_options=ssl.OP_NO_SSLv3)
3115 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3116 if no_sslv2_implies_sslv3_hello():
3117 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003118 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003119 False, client_options=ssl.OP_NO_SSLv2)
3120
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003121 def test_protocol_tlsv1(self):
3122 """Connecting to a TLSv1 server with various client options"""
3123 if support.verbose:
3124 sys.stdout.write("\n")
3125 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3126 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3127 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3128 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3129 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3130 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3131 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003132 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003133 client_options=ssl.OP_NO_TLSv1)
3134
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003135 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3136 "TLS version 1.1 not supported.")
3137 def test_protocol_tlsv1_1(self):
3138 """Connecting to a TLSv1.1 server with various client options.
3139 Testing against older TLS versions."""
3140 if support.verbose:
3141 sys.stdout.write("\n")
3142 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3143 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3144 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3145 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3146 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003147 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003148 client_options=ssl.OP_NO_TLSv1_1)
3149
Christian Heimesa170fa12017-09-15 20:27:30 +02003150 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003151 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3152 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3153
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003154 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3155 "TLS version 1.2 not supported.")
3156 def test_protocol_tlsv1_2(self):
3157 """Connecting to a TLSv1.2 server with various client options.
3158 Testing against older TLS versions."""
3159 if support.verbose:
3160 sys.stdout.write("\n")
3161 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3162 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3163 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3164 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3165 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3166 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3167 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003168 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003169 client_options=ssl.OP_NO_TLSv1_2)
3170
Christian Heimesa170fa12017-09-15 20:27:30 +02003171 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003172 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3173 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3174 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3175 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3176
3177 def test_starttls(self):
3178 """Switching from clear text to encrypted and back again."""
3179 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3180
3181 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003182 starttls_server=True,
3183 chatty=True,
3184 connectionchatty=True)
3185 wrapped = False
3186 with server:
3187 s = socket.socket()
3188 s.setblocking(1)
3189 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003190 if support.verbose:
3191 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003192 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003193 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003194 sys.stdout.write(
3195 " client: sending %r...\n" % indata)
3196 if wrapped:
3197 conn.write(indata)
3198 outdata = conn.read()
3199 else:
3200 s.send(indata)
3201 outdata = s.recv(1024)
3202 msg = outdata.strip().lower()
3203 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3204 # STARTTLS ok, switch to secure mode
3205 if support.verbose:
3206 sys.stdout.write(
3207 " client: read %r from server, starting TLS...\n"
3208 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003209 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003210 wrapped = True
3211 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3212 # ENDTLS ok, switch back to clear text
3213 if support.verbose:
3214 sys.stdout.write(
3215 " client: read %r from server, ending TLS...\n"
3216 % msg)
3217 s = conn.unwrap()
3218 wrapped = False
3219 else:
3220 if support.verbose:
3221 sys.stdout.write(
3222 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003223 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003224 sys.stdout.write(" client: closing connection.\n")
3225 if wrapped:
3226 conn.write(b"over\n")
3227 else:
3228 s.send(b"over\n")
3229 if wrapped:
3230 conn.close()
3231 else:
3232 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003233
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003234 def test_socketserver(self):
3235 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003236 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003237 # try to connect
3238 if support.verbose:
3239 sys.stdout.write('\n')
3240 with open(CERTFILE, 'rb') as f:
3241 d1 = f.read()
3242 d2 = ''
3243 # now fetch the same data from the HTTPS server
3244 url = 'https://localhost:%d/%s' % (
3245 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003246 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003247 f = urllib.request.urlopen(url, context=context)
3248 try:
3249 dlen = f.info().get("content-length")
3250 if dlen and (int(dlen) > 0):
3251 d2 = f.read(int(dlen))
3252 if support.verbose:
3253 sys.stdout.write(
3254 " client: read %d bytes from remote server '%s'\n"
3255 % (len(d2), server))
3256 finally:
3257 f.close()
3258 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003259
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003260 def test_asyncore_server(self):
3261 """Check the example asyncore integration."""
3262 if support.verbose:
3263 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003264
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003265 indata = b"FOO\n"
3266 server = AsyncoreEchoServer(CERTFILE)
3267 with server:
3268 s = test_wrap_socket(socket.socket())
3269 s.connect(('127.0.0.1', server.port))
3270 if support.verbose:
3271 sys.stdout.write(
3272 " client: sending %r...\n" % indata)
3273 s.write(indata)
3274 outdata = s.read()
3275 if support.verbose:
3276 sys.stdout.write(" client: read %r\n" % outdata)
3277 if outdata != indata.lower():
3278 self.fail(
3279 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3280 % (outdata[:20], len(outdata),
3281 indata[:20].lower(), len(indata)))
3282 s.write(b"over\n")
3283 if support.verbose:
3284 sys.stdout.write(" client: closing connection.\n")
3285 s.close()
3286 if support.verbose:
3287 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003288
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003289 def test_recv_send(self):
3290 """Test recv(), send() and friends."""
3291 if support.verbose:
3292 sys.stdout.write("\n")
3293
3294 server = ThreadedEchoServer(CERTFILE,
3295 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003296 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003297 cacerts=CERTFILE,
3298 chatty=True,
3299 connectionchatty=False)
3300 with server:
3301 s = test_wrap_socket(socket.socket(),
3302 server_side=False,
3303 certfile=CERTFILE,
3304 ca_certs=CERTFILE,
3305 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003306 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003307 s.connect((HOST, server.port))
3308 # helper methods for standardising recv* method signatures
3309 def _recv_into():
3310 b = bytearray(b"\0"*100)
3311 count = s.recv_into(b)
3312 return b[:count]
3313
3314 def _recvfrom_into():
3315 b = bytearray(b"\0"*100)
3316 count, addr = s.recvfrom_into(b)
3317 return b[:count]
3318
3319 # (name, method, expect success?, *args, return value func)
3320 send_methods = [
3321 ('send', s.send, True, [], len),
3322 ('sendto', s.sendto, False, ["some.address"], len),
3323 ('sendall', s.sendall, True, [], lambda x: None),
3324 ]
3325 # (name, method, whether to expect success, *args)
3326 recv_methods = [
3327 ('recv', s.recv, True, []),
3328 ('recvfrom', s.recvfrom, False, ["some.address"]),
3329 ('recv_into', _recv_into, True, []),
3330 ('recvfrom_into', _recvfrom_into, False, []),
3331 ]
3332 data_prefix = "PREFIX_"
3333
3334 for (meth_name, send_meth, expect_success, args,
3335 ret_val_meth) in send_methods:
3336 indata = (data_prefix + meth_name).encode('ascii')
3337 try:
3338 ret = send_meth(indata, *args)
3339 msg = "sending with {}".format(meth_name)
3340 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3341 outdata = s.read()
3342 if outdata != indata.lower():
3343 self.fail(
3344 "While sending with <<{name:s}>> bad data "
3345 "<<{outdata:r}>> ({nout:d}) received; "
3346 "expected <<{indata:r}>> ({nin:d})\n".format(
3347 name=meth_name, outdata=outdata[:20],
3348 nout=len(outdata),
3349 indata=indata[:20], nin=len(indata)
3350 )
3351 )
3352 except ValueError as e:
3353 if expect_success:
3354 self.fail(
3355 "Failed to send with method <<{name:s}>>; "
3356 "expected to succeed.\n".format(name=meth_name)
3357 )
3358 if not str(e).startswith(meth_name):
3359 self.fail(
3360 "Method <<{name:s}>> failed with unexpected "
3361 "exception message: {exp:s}\n".format(
3362 name=meth_name, exp=e
3363 )
3364 )
3365
3366 for meth_name, recv_meth, expect_success, args in recv_methods:
3367 indata = (data_prefix + meth_name).encode('ascii')
3368 try:
3369 s.send(indata)
3370 outdata = recv_meth(*args)
3371 if outdata != indata.lower():
3372 self.fail(
3373 "While receiving with <<{name:s}>> bad data "
3374 "<<{outdata:r}>> ({nout:d}) received; "
3375 "expected <<{indata:r}>> ({nin:d})\n".format(
3376 name=meth_name, outdata=outdata[:20],
3377 nout=len(outdata),
3378 indata=indata[:20], nin=len(indata)
3379 )
3380 )
3381 except ValueError as e:
3382 if expect_success:
3383 self.fail(
3384 "Failed to receive with method <<{name:s}>>; "
3385 "expected to succeed.\n".format(name=meth_name)
3386 )
3387 if not str(e).startswith(meth_name):
3388 self.fail(
3389 "Method <<{name:s}>> failed with unexpected "
3390 "exception message: {exp:s}\n".format(
3391 name=meth_name, exp=e
3392 )
3393 )
3394 # consume data
3395 s.read()
3396
3397 # read(-1, buffer) is supported, even though read(-1) is not
3398 data = b"data"
3399 s.send(data)
3400 buffer = bytearray(len(data))
3401 self.assertEqual(s.read(-1, buffer), len(data))
3402 self.assertEqual(buffer, data)
3403
Christian Heimes888bbdc2017-09-07 14:18:21 -07003404 # sendall accepts bytes-like objects
3405 if ctypes is not None:
3406 ubyte = ctypes.c_ubyte * len(data)
3407 byteslike = ubyte.from_buffer_copy(data)
3408 s.sendall(byteslike)
3409 self.assertEqual(s.read(), data)
3410
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003411 # Make sure sendmsg et al are disallowed to avoid
3412 # inadvertent disclosure of data and/or corruption
3413 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003414 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003415 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3416 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3417 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003418 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003419 s.write(b"over\n")
3420
3421 self.assertRaises(ValueError, s.recv, -1)
3422 self.assertRaises(ValueError, s.read, -1)
3423
3424 s.close()
3425
3426 def test_recv_zero(self):
3427 server = ThreadedEchoServer(CERTFILE)
3428 server.__enter__()
3429 self.addCleanup(server.__exit__, None, None)
3430 s = socket.create_connection((HOST, server.port))
3431 self.addCleanup(s.close)
3432 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3433 self.addCleanup(s.close)
3434
3435 # recv/read(0) should return no data
3436 s.send(b"data")
3437 self.assertEqual(s.recv(0), b"")
3438 self.assertEqual(s.read(0), b"")
3439 self.assertEqual(s.read(), b"data")
3440
3441 # Should not block if the other end sends no data
3442 s.setblocking(False)
3443 self.assertEqual(s.recv(0), b"")
3444 self.assertEqual(s.recv_into(bytearray()), 0)
3445
3446 def test_nonblocking_send(self):
3447 server = ThreadedEchoServer(CERTFILE,
3448 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003449 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003450 cacerts=CERTFILE,
3451 chatty=True,
3452 connectionchatty=False)
3453 with server:
3454 s = test_wrap_socket(socket.socket(),
3455 server_side=False,
3456 certfile=CERTFILE,
3457 ca_certs=CERTFILE,
3458 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003459 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003460 s.connect((HOST, server.port))
3461 s.setblocking(False)
3462
3463 # If we keep sending data, at some point the buffers
3464 # will be full and the call will block
3465 buf = bytearray(8192)
3466 def fill_buffer():
3467 while True:
3468 s.send(buf)
3469 self.assertRaises((ssl.SSLWantWriteError,
3470 ssl.SSLWantReadError), fill_buffer)
3471
3472 # Now read all the output and discard it
3473 s.setblocking(True)
3474 s.close()
3475
3476 def test_handshake_timeout(self):
3477 # Issue #5103: SSL handshake must respect the socket timeout
3478 server = socket.socket(socket.AF_INET)
3479 host = "127.0.0.1"
3480 port = support.bind_port(server)
3481 started = threading.Event()
3482 finish = False
3483
3484 def serve():
3485 server.listen()
3486 started.set()
3487 conns = []
3488 while not finish:
3489 r, w, e = select.select([server], [], [], 0.1)
3490 if server in r:
3491 # Let the socket hang around rather than having
3492 # it closed by garbage collection.
3493 conns.append(server.accept()[0])
3494 for sock in conns:
3495 sock.close()
3496
3497 t = threading.Thread(target=serve)
3498 t.start()
3499 started.wait()
3500
3501 try:
3502 try:
3503 c = socket.socket(socket.AF_INET)
3504 c.settimeout(0.2)
3505 c.connect((host, port))
3506 # Will attempt handshake and time out
3507 self.assertRaisesRegex(socket.timeout, "timed out",
3508 test_wrap_socket, c)
3509 finally:
3510 c.close()
3511 try:
3512 c = socket.socket(socket.AF_INET)
3513 c = test_wrap_socket(c)
3514 c.settimeout(0.2)
3515 # Will attempt handshake and time out
3516 self.assertRaisesRegex(socket.timeout, "timed out",
3517 c.connect, (host, port))
3518 finally:
3519 c.close()
3520 finally:
3521 finish = True
3522 t.join()
3523 server.close()
3524
3525 def test_server_accept(self):
3526 # Issue #16357: accept() on a SSLSocket created through
3527 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003528 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003529 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003530 context.load_verify_locations(SIGNING_CA)
3531 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003532 server = socket.socket(socket.AF_INET)
3533 host = "127.0.0.1"
3534 port = support.bind_port(server)
3535 server = context.wrap_socket(server, server_side=True)
3536 self.assertTrue(server.server_side)
3537
3538 evt = threading.Event()
3539 remote = None
3540 peer = None
3541 def serve():
3542 nonlocal remote, peer
3543 server.listen()
3544 # Block on the accept and wait on the connection to close.
3545 evt.set()
3546 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003547 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003548
3549 t = threading.Thread(target=serve)
3550 t.start()
3551 # Client wait until server setup and perform a connect.
3552 evt.wait()
3553 client = context.wrap_socket(socket.socket())
3554 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003555 client.send(b'data')
3556 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003557 client_addr = client.getsockname()
3558 client.close()
3559 t.join()
3560 remote.close()
3561 server.close()
3562 # Sanity checks.
3563 self.assertIsInstance(remote, ssl.SSLSocket)
3564 self.assertEqual(peer, client_addr)
3565
3566 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003567 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003568 with context.wrap_socket(socket.socket()) as sock:
3569 with self.assertRaises(OSError) as cm:
3570 sock.getpeercert()
3571 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3572
3573 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003574 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003575 with context.wrap_socket(socket.socket()) as sock:
3576 with self.assertRaises(OSError) as cm:
3577 sock.do_handshake()
3578 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3579
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003580 def test_no_shared_ciphers(self):
3581 client_context, server_context, hostname = testing_context()
3582 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3583 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003584 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003585 client_context.set_ciphers("AES128")
3586 server_context.set_ciphers("AES256")
3587 with ThreadedEchoServer(context=server_context) as server:
3588 with client_context.wrap_socket(socket.socket(),
3589 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003590 with self.assertRaises(OSError):
3591 s.connect((HOST, server.port))
3592 self.assertIn("no shared cipher", server.conn_errors[0])
3593
3594 def test_version_basic(self):
3595 """
3596 Basic tests for SSLSocket.version().
3597 More tests are done in the test_protocol_*() methods.
3598 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003599 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3600 context.check_hostname = False
3601 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003602 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003603 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003604 chatty=False) as server:
3605 with context.wrap_socket(socket.socket()) as s:
3606 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003607 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003608 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003609 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003610 self.assertEqual(s.version(), 'TLSv1.3')
3611 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003612 self.assertEqual(s.version(), 'TLSv1.2')
3613 else: # 0.9.8 to 1.0.1
3614 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003615 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003616 self.assertIs(s.version(), None)
3617
Christian Heimescb5b68a2017-09-07 18:07:00 -07003618 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3619 "test requires TLSv1.3 enabled OpenSSL")
3620 def test_tls1_3(self):
3621 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3622 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003623 context.options |= (
3624 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3625 )
3626 with ThreadedEchoServer(context=context) as server:
3627 with context.wrap_socket(socket.socket()) as s:
3628 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003629 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003630 'TLS_AES_256_GCM_SHA384',
3631 'TLS_CHACHA20_POLY1305_SHA256',
3632 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003633 })
3634 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003635
Christian Heimes698dde12018-02-27 11:54:43 +01003636 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3637 "required OpenSSL 1.1.0g")
3638 def test_min_max_version(self):
3639 client_context, server_context, hostname = testing_context()
3640 # client TLSv1.0 to 1.2
3641 client_context.minimum_version = ssl.TLSVersion.TLSv1
3642 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3643 # server only TLSv1.2
3644 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3645 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3646
3647 with ThreadedEchoServer(context=server_context) as server:
3648 with client_context.wrap_socket(socket.socket(),
3649 server_hostname=hostname) as s:
3650 s.connect((HOST, server.port))
3651 self.assertEqual(s.version(), 'TLSv1.2')
3652
3653 # client 1.0 to 1.2, server 1.0 to 1.1
3654 server_context.minimum_version = ssl.TLSVersion.TLSv1
3655 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3656
3657 with ThreadedEchoServer(context=server_context) as server:
3658 with client_context.wrap_socket(socket.socket(),
3659 server_hostname=hostname) as s:
3660 s.connect((HOST, server.port))
3661 self.assertEqual(s.version(), 'TLSv1.1')
3662
3663 # client 1.0, server 1.2 (mismatch)
3664 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3665 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3666 client_context.minimum_version = ssl.TLSVersion.TLSv1
3667 client_context.maximum_version = ssl.TLSVersion.TLSv1
3668 with ThreadedEchoServer(context=server_context) as server:
3669 with client_context.wrap_socket(socket.socket(),
3670 server_hostname=hostname) as s:
3671 with self.assertRaises(ssl.SSLError) as e:
3672 s.connect((HOST, server.port))
3673 self.assertIn("alert", str(e.exception))
3674
3675
3676 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3677 "required OpenSSL 1.1.0g")
3678 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3679 def test_min_max_version_sslv3(self):
3680 client_context, server_context, hostname = testing_context()
3681 server_context.minimum_version = ssl.TLSVersion.SSLv3
3682 client_context.minimum_version = ssl.TLSVersion.SSLv3
3683 client_context.maximum_version = ssl.TLSVersion.SSLv3
3684 with ThreadedEchoServer(context=server_context) as server:
3685 with client_context.wrap_socket(socket.socket(),
3686 server_hostname=hostname) as s:
3687 s.connect((HOST, server.port))
3688 self.assertEqual(s.version(), 'SSLv3')
3689
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003690 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3691 def test_default_ecdh_curve(self):
3692 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3693 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003694 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003695 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003696 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3697 # cipher name.
3698 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003699 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3700 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3701 # our default cipher list should prefer ECDH-based ciphers
3702 # automatically.
3703 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3704 context.set_ciphers("ECCdraft:ECDH")
3705 with ThreadedEchoServer(context=context) as server:
3706 with context.wrap_socket(socket.socket()) as s:
3707 s.connect((HOST, server.port))
3708 self.assertIn("ECDH", s.cipher()[0])
3709
3710 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3711 "'tls-unique' channel binding not available")
3712 def test_tls_unique_channel_binding(self):
3713 """Test tls-unique channel binding."""
3714 if support.verbose:
3715 sys.stdout.write("\n")
3716
Christian Heimes05d9fe32018-02-27 08:55:39 +01003717 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003718
3719 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003720 chatty=True,
3721 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003722
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003723 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003724 with client_context.wrap_socket(
3725 socket.socket(),
3726 server_hostname=hostname) as s:
3727 s.connect((HOST, server.port))
3728 # get the data
3729 cb_data = s.get_channel_binding("tls-unique")
3730 if support.verbose:
3731 sys.stdout.write(
3732 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003733
Christian Heimes05d9fe32018-02-27 08:55:39 +01003734 # check if it is sane
3735 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003736 if s.version() == 'TLSv1.3':
3737 self.assertEqual(len(cb_data), 48)
3738 else:
3739 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003740
Christian Heimes05d9fe32018-02-27 08:55:39 +01003741 # and compare with the peers version
3742 s.write(b"CB tls-unique\n")
3743 peer_data_repr = s.read().strip()
3744 self.assertEqual(peer_data_repr,
3745 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003746
3747 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003748 with client_context.wrap_socket(
3749 socket.socket(),
3750 server_hostname=hostname) as s:
3751 s.connect((HOST, server.port))
3752 new_cb_data = s.get_channel_binding("tls-unique")
3753 if support.verbose:
3754 sys.stdout.write(
3755 "got another channel binding data: {0!r}\n".format(
3756 new_cb_data)
3757 )
3758 # is it really unique
3759 self.assertNotEqual(cb_data, new_cb_data)
3760 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003761 if s.version() == 'TLSv1.3':
3762 self.assertEqual(len(cb_data), 48)
3763 else:
3764 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003765 s.write(b"CB tls-unique\n")
3766 peer_data_repr = s.read().strip()
3767 self.assertEqual(peer_data_repr,
3768 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003769
3770 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003771 client_context, server_context, hostname = testing_context()
3772 stats = server_params_test(client_context, server_context,
3773 chatty=True, connectionchatty=True,
3774 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003775 if support.verbose:
3776 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3777 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3778
3779 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3780 "ssl.OP_NO_COMPRESSION needed for this test")
3781 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003782 client_context, server_context, hostname = testing_context()
3783 client_context.options |= ssl.OP_NO_COMPRESSION
3784 server_context.options |= ssl.OP_NO_COMPRESSION
3785 stats = server_params_test(client_context, server_context,
3786 chatty=True, connectionchatty=True,
3787 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003788 self.assertIs(stats['compression'], None)
3789
3790 def test_dh_params(self):
3791 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003792 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003793 # test scenario needs TLS <= 1.2
3794 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003795 server_context.load_dh_params(DHFILE)
3796 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003797 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003798 stats = server_params_test(client_context, server_context,
3799 chatty=True, connectionchatty=True,
3800 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003801 cipher = stats["cipher"][0]
3802 parts = cipher.split("-")
3803 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3804 self.fail("Non-DH cipher: " + cipher[0])
3805
Christian Heimesb7b92252018-02-25 09:49:31 +01003806 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003807 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003808 def test_ecdh_curve(self):
3809 # server secp384r1, client auto
3810 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003811
Christian Heimesb7b92252018-02-25 09:49:31 +01003812 server_context.set_ecdh_curve("secp384r1")
3813 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3814 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3815 stats = server_params_test(client_context, server_context,
3816 chatty=True, connectionchatty=True,
3817 sni_name=hostname)
3818
3819 # server auto, client secp384r1
3820 client_context, server_context, hostname = testing_context()
3821 client_context.set_ecdh_curve("secp384r1")
3822 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3823 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3824 stats = server_params_test(client_context, server_context,
3825 chatty=True, connectionchatty=True,
3826 sni_name=hostname)
3827
3828 # server / client curve mismatch
3829 client_context, server_context, hostname = testing_context()
3830 client_context.set_ecdh_curve("prime256v1")
3831 server_context.set_ecdh_curve("secp384r1")
3832 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3833 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3834 try:
3835 stats = server_params_test(client_context, server_context,
3836 chatty=True, connectionchatty=True,
3837 sni_name=hostname)
3838 except ssl.SSLError:
3839 pass
3840 else:
3841 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003842 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003843 self.fail("mismatch curve did not fail")
3844
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003845 def test_selected_alpn_protocol(self):
3846 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003847 client_context, server_context, hostname = testing_context()
3848 stats = server_params_test(client_context, server_context,
3849 chatty=True, connectionchatty=True,
3850 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003851 self.assertIs(stats['client_alpn_protocol'], None)
3852
3853 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3854 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3855 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003856 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003857 server_context.set_alpn_protocols(['foo', 'bar'])
3858 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003859 chatty=True, connectionchatty=True,
3860 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003861 self.assertIs(stats['client_alpn_protocol'], None)
3862
3863 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3864 def test_alpn_protocols(self):
3865 server_protocols = ['foo', 'bar', 'milkshake']
3866 protocol_tests = [
3867 (['foo', 'bar'], 'foo'),
3868 (['bar', 'foo'], 'foo'),
3869 (['milkshake'], 'milkshake'),
3870 (['http/3.0', 'http/4.0'], None)
3871 ]
3872 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003873 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003874 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003875 client_context.set_alpn_protocols(client_protocols)
3876
3877 try:
3878 stats = server_params_test(client_context,
3879 server_context,
3880 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003881 connectionchatty=True,
3882 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003883 except ssl.SSLError as e:
3884 stats = e
3885
Christian Heimes05d9fe32018-02-27 08:55:39 +01003886 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003887 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3888 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3889 self.assertIsInstance(stats, ssl.SSLError)
3890 else:
3891 msg = "failed trying %s (s) and %s (c).\n" \
3892 "was expecting %s, but got %%s from the %%s" \
3893 % (str(server_protocols), str(client_protocols),
3894 str(expected))
3895 client_result = stats['client_alpn_protocol']
3896 self.assertEqual(client_result, expected,
3897 msg % (client_result, "client"))
3898 server_result = stats['server_alpn_protocols'][-1] \
3899 if len(stats['server_alpn_protocols']) else 'nothing'
3900 self.assertEqual(server_result, expected,
3901 msg % (server_result, "server"))
3902
3903 def test_selected_npn_protocol(self):
3904 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003905 client_context, server_context, hostname = testing_context()
3906 stats = server_params_test(client_context, server_context,
3907 chatty=True, connectionchatty=True,
3908 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003909 self.assertIs(stats['client_npn_protocol'], None)
3910
3911 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3912 def test_npn_protocols(self):
3913 server_protocols = ['http/1.1', 'spdy/2']
3914 protocol_tests = [
3915 (['http/1.1', 'spdy/2'], 'http/1.1'),
3916 (['spdy/2', 'http/1.1'], 'http/1.1'),
3917 (['spdy/2', 'test'], 'spdy/2'),
3918 (['abc', 'def'], 'abc')
3919 ]
3920 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003921 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003922 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003923 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003924 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003925 chatty=True, connectionchatty=True,
3926 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 msg = "failed trying %s (s) and %s (c).\n" \
3928 "was expecting %s, but got %%s from the %%s" \
3929 % (str(server_protocols), str(client_protocols),
3930 str(expected))
3931 client_result = stats['client_npn_protocol']
3932 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3933 server_result = stats['server_npn_protocols'][-1] \
3934 if len(stats['server_npn_protocols']) else 'nothing'
3935 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3936
3937 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003938 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003939 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003940 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003942 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003943 client_context.load_verify_locations(SIGNING_CA)
3944 return server_context, other_context, client_context
3945
3946 def check_common_name(self, stats, name):
3947 cert = stats['peercert']
3948 self.assertIn((('commonName', name),), cert['subject'])
3949
3950 @needs_sni
3951 def test_sni_callback(self):
3952 calls = []
3953 server_context, other_context, client_context = self.sni_contexts()
3954
Christian Heimesa170fa12017-09-15 20:27:30 +02003955 client_context.check_hostname = False
3956
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003957 def servername_cb(ssl_sock, server_name, initial_context):
3958 calls.append((server_name, initial_context))
3959 if server_name is not None:
3960 ssl_sock.context = other_context
3961 server_context.set_servername_callback(servername_cb)
3962
3963 stats = server_params_test(client_context, server_context,
3964 chatty=True,
3965 sni_name='supermessage')
3966 # The hostname was fetched properly, and the certificate was
3967 # changed for the connection.
3968 self.assertEqual(calls, [("supermessage", server_context)])
3969 # CERTFILE4 was selected
3970 self.check_common_name(stats, 'fakehostname')
3971
3972 calls = []
3973 # The callback is called with server_name=None
3974 stats = server_params_test(client_context, server_context,
3975 chatty=True,
3976 sni_name=None)
3977 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003978 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003979
3980 # Check disabling the callback
3981 calls = []
3982 server_context.set_servername_callback(None)
3983
3984 stats = server_params_test(client_context, server_context,
3985 chatty=True,
3986 sni_name='notfunny')
3987 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003988 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003989 self.assertEqual(calls, [])
3990
3991 @needs_sni
3992 def test_sni_callback_alert(self):
3993 # Returning a TLS alert is reflected to the connecting client
3994 server_context, other_context, client_context = self.sni_contexts()
3995
3996 def cb_returning_alert(ssl_sock, server_name, initial_context):
3997 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3998 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003999 with self.assertRaises(ssl.SSLError) as cm:
4000 stats = server_params_test(client_context, server_context,
4001 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004002 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004003 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004004
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004005 @needs_sni
4006 def test_sni_callback_raising(self):
4007 # Raising fails the connection with a TLS handshake failure alert.
4008 server_context, other_context, client_context = self.sni_contexts()
4009
4010 def cb_raising(ssl_sock, server_name, initial_context):
4011 1/0
4012 server_context.set_servername_callback(cb_raising)
4013
4014 with self.assertRaises(ssl.SSLError) as cm, \
4015 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004016 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004017 chatty=False,
4018 sni_name='supermessage')
4019 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
4020 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02004021
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 @needs_sni
4023 def test_sni_callback_wrong_return_type(self):
4024 # Returning the wrong return type terminates the TLS connection
4025 # with an internal error alert.
4026 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004027
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004028 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4029 return "foo"
4030 server_context.set_servername_callback(cb_wrong_return_type)
4031
4032 with self.assertRaises(ssl.SSLError) as cm, \
4033 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004034 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004035 chatty=False,
4036 sni_name='supermessage')
4037 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4038 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004039
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004040 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004041 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004042 client_context.set_ciphers("AES128:AES256")
4043 server_context.set_ciphers("AES256")
4044 expected_algs = [
4045 "AES256", "AES-256",
4046 # TLS 1.3 ciphers are always enabled
4047 "TLS_CHACHA20", "TLS_AES",
4048 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004049
Christian Heimesa170fa12017-09-15 20:27:30 +02004050 stats = server_params_test(client_context, server_context,
4051 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004052 ciphers = stats['server_shared_ciphers'][0]
4053 self.assertGreater(len(ciphers), 0)
4054 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004055 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004056 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004057
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004058 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004059 client_context, server_context, hostname = testing_context()
4060 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004061
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004062 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004063 s = client_context.wrap_socket(socket.socket(),
4064 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004065 s.connect((HOST, server.port))
4066 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004068 self.assertRaises(ValueError, s.read, 1024)
4069 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004070
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 def test_sendfile(self):
4072 TEST_DATA = b"x" * 512
4073 with open(support.TESTFN, 'wb') as f:
4074 f.write(TEST_DATA)
4075 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004076 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004077 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004078 context.load_verify_locations(SIGNING_CA)
4079 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004080 server = ThreadedEchoServer(context=context, chatty=False)
4081 with server:
4082 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004083 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004084 with open(support.TESTFN, 'rb') as file:
4085 s.sendfile(file)
4086 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004087
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004088 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004089 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004090 # TODO: sessions aren't compatible with TLSv1.3 yet
4091 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004092
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004094 stats = server_params_test(client_context, server_context,
4095 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 session = stats['session']
4097 self.assertTrue(session.id)
4098 self.assertGreater(session.time, 0)
4099 self.assertGreater(session.timeout, 0)
4100 self.assertTrue(session.has_ticket)
4101 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4102 self.assertGreater(session.ticket_lifetime_hint, 0)
4103 self.assertFalse(stats['session_reused'])
4104 sess_stat = server_context.session_stats()
4105 self.assertEqual(sess_stat['accept'], 1)
4106 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004107
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004108 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004109 stats = server_params_test(client_context, server_context,
4110 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004111 sess_stat = server_context.session_stats()
4112 self.assertEqual(sess_stat['accept'], 2)
4113 self.assertEqual(sess_stat['hits'], 1)
4114 self.assertTrue(stats['session_reused'])
4115 session2 = stats['session']
4116 self.assertEqual(session2.id, session.id)
4117 self.assertEqual(session2, session)
4118 self.assertIsNot(session2, session)
4119 self.assertGreaterEqual(session2.time, session.time)
4120 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004121
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004122 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004123 stats = server_params_test(client_context, server_context,
4124 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004125 self.assertFalse(stats['session_reused'])
4126 session3 = stats['session']
4127 self.assertNotEqual(session3.id, session.id)
4128 self.assertNotEqual(session3, session)
4129 sess_stat = server_context.session_stats()
4130 self.assertEqual(sess_stat['accept'], 3)
4131 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004132
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004133 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004134 stats = server_params_test(client_context, server_context,
4135 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004136 self.assertTrue(stats['session_reused'])
4137 session4 = stats['session']
4138 self.assertEqual(session4.id, session.id)
4139 self.assertEqual(session4, session)
4140 self.assertGreaterEqual(session4.time, session.time)
4141 self.assertGreaterEqual(session4.timeout, session.timeout)
4142 sess_stat = server_context.session_stats()
4143 self.assertEqual(sess_stat['accept'], 4)
4144 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004145
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004146 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004147 client_context, server_context, hostname = testing_context()
4148 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004149
Christian Heimes05d9fe32018-02-27 08:55:39 +01004150 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004151 client_context.options |= ssl.OP_NO_TLSv1_3
4152 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004153
Christian Heimesa170fa12017-09-15 20:27:30 +02004154 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004155 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004156 with client_context.wrap_socket(socket.socket(),
4157 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004158 # session is None before handshake
4159 self.assertEqual(s.session, None)
4160 self.assertEqual(s.session_reused, None)
4161 s.connect((HOST, server.port))
4162 session = s.session
4163 self.assertTrue(session)
4164 with self.assertRaises(TypeError) as e:
4165 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004166 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004167
Christian Heimesa170fa12017-09-15 20:27:30 +02004168 with client_context.wrap_socket(socket.socket(),
4169 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004170 s.connect((HOST, server.port))
4171 # cannot set session after handshake
4172 with self.assertRaises(ValueError) as e:
4173 s.session = session
4174 self.assertEqual(str(e.exception),
4175 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004176
Christian Heimesa170fa12017-09-15 20:27:30 +02004177 with client_context.wrap_socket(socket.socket(),
4178 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004179 # can set session before handshake and before the
4180 # connection was established
4181 s.session = session
4182 s.connect((HOST, server.port))
4183 self.assertEqual(s.session.id, session.id)
4184 self.assertEqual(s.session, session)
4185 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004186
Christian Heimesa170fa12017-09-15 20:27:30 +02004187 with client_context2.wrap_socket(socket.socket(),
4188 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004189 # cannot re-use session with a different SSLContext
4190 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004191 s.session = session
4192 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004193 self.assertEqual(str(e.exception),
4194 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004195
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004196
Christian Heimes9fb051f2018-09-23 08:32:31 +02004197@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4198class TestPostHandshakeAuth(unittest.TestCase):
4199 def test_pha_setter(self):
4200 protocols = [
4201 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4202 ]
4203 for protocol in protocols:
4204 ctx = ssl.SSLContext(protocol)
4205 self.assertEqual(ctx.post_handshake_auth, False)
4206
4207 ctx.post_handshake_auth = True
4208 self.assertEqual(ctx.post_handshake_auth, True)
4209
4210 ctx.verify_mode = ssl.CERT_REQUIRED
4211 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4212 self.assertEqual(ctx.post_handshake_auth, True)
4213
4214 ctx.post_handshake_auth = False
4215 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4216 self.assertEqual(ctx.post_handshake_auth, False)
4217
4218 ctx.verify_mode = ssl.CERT_OPTIONAL
4219 ctx.post_handshake_auth = True
4220 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4221 self.assertEqual(ctx.post_handshake_auth, True)
4222
4223 def test_pha_required(self):
4224 client_context, server_context, hostname = testing_context()
4225 server_context.post_handshake_auth = True
4226 server_context.verify_mode = ssl.CERT_REQUIRED
4227 client_context.post_handshake_auth = True
4228 client_context.load_cert_chain(SIGNED_CERTFILE)
4229
4230 server = ThreadedEchoServer(context=server_context, chatty=False)
4231 with server:
4232 with client_context.wrap_socket(socket.socket(),
4233 server_hostname=hostname) as s:
4234 s.connect((HOST, server.port))
4235 s.write(b'HASCERT')
4236 self.assertEqual(s.recv(1024), b'FALSE\n')
4237 s.write(b'PHA')
4238 self.assertEqual(s.recv(1024), b'OK\n')
4239 s.write(b'HASCERT')
4240 self.assertEqual(s.recv(1024), b'TRUE\n')
4241 # PHA method just returns true when cert is already available
4242 s.write(b'PHA')
4243 self.assertEqual(s.recv(1024), b'OK\n')
4244 s.write(b'GETCERT')
4245 cert_text = s.recv(4096).decode('us-ascii')
4246 self.assertIn('Python Software Foundation CA', cert_text)
4247
4248 def test_pha_required_nocert(self):
4249 client_context, server_context, hostname = testing_context()
4250 server_context.post_handshake_auth = True
4251 server_context.verify_mode = ssl.CERT_REQUIRED
4252 client_context.post_handshake_auth = True
4253
4254 server = ThreadedEchoServer(context=server_context, chatty=False)
4255 with server:
4256 with client_context.wrap_socket(socket.socket(),
4257 server_hostname=hostname) as s:
4258 s.connect((HOST, server.port))
4259 s.write(b'PHA')
4260 # receive CertificateRequest
4261 self.assertEqual(s.recv(1024), b'OK\n')
4262 # send empty Certificate + Finish
4263 s.write(b'HASCERT')
4264 # receive alert
4265 with self.assertRaisesRegex(
4266 ssl.SSLError,
4267 'tlsv13 alert certificate required'):
4268 s.recv(1024)
4269
4270 def test_pha_optional(self):
4271 if support.verbose:
4272 sys.stdout.write("\n")
4273
4274 client_context, server_context, hostname = testing_context()
4275 server_context.post_handshake_auth = True
4276 server_context.verify_mode = ssl.CERT_REQUIRED
4277 client_context.post_handshake_auth = True
4278 client_context.load_cert_chain(SIGNED_CERTFILE)
4279
4280 # check CERT_OPTIONAL
4281 server_context.verify_mode = ssl.CERT_OPTIONAL
4282 server = ThreadedEchoServer(context=server_context, chatty=False)
4283 with server:
4284 with client_context.wrap_socket(socket.socket(),
4285 server_hostname=hostname) as s:
4286 s.connect((HOST, server.port))
4287 s.write(b'HASCERT')
4288 self.assertEqual(s.recv(1024), b'FALSE\n')
4289 s.write(b'PHA')
4290 self.assertEqual(s.recv(1024), b'OK\n')
4291 s.write(b'HASCERT')
4292 self.assertEqual(s.recv(1024), b'TRUE\n')
4293
4294 def test_pha_optional_nocert(self):
4295 if support.verbose:
4296 sys.stdout.write("\n")
4297
4298 client_context, server_context, hostname = testing_context()
4299 server_context.post_handshake_auth = True
4300 server_context.verify_mode = ssl.CERT_OPTIONAL
4301 client_context.post_handshake_auth = True
4302
4303 server = ThreadedEchoServer(context=server_context, chatty=False)
4304 with server:
4305 with client_context.wrap_socket(socket.socket(),
4306 server_hostname=hostname) as s:
4307 s.connect((HOST, server.port))
4308 s.write(b'HASCERT')
4309 self.assertEqual(s.recv(1024), b'FALSE\n')
4310 s.write(b'PHA')
4311 self.assertEqual(s.recv(1024), b'OK\n')
4312 # optional doens't fail when client does not have a cert
4313 s.write(b'HASCERT')
4314 self.assertEqual(s.recv(1024), b'FALSE\n')
4315
4316 def test_pha_no_pha_client(self):
4317 client_context, server_context, hostname = testing_context()
4318 server_context.post_handshake_auth = True
4319 server_context.verify_mode = ssl.CERT_REQUIRED
4320 client_context.load_cert_chain(SIGNED_CERTFILE)
4321
4322 server = ThreadedEchoServer(context=server_context, chatty=False)
4323 with server:
4324 with client_context.wrap_socket(socket.socket(),
4325 server_hostname=hostname) as s:
4326 s.connect((HOST, server.port))
4327 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4328 s.verify_client_post_handshake()
4329 s.write(b'PHA')
4330 self.assertIn(b'extension not received', s.recv(1024))
4331
4332 def test_pha_no_pha_server(self):
4333 # server doesn't have PHA enabled, cert is requested in handshake
4334 client_context, server_context, hostname = testing_context()
4335 server_context.verify_mode = ssl.CERT_REQUIRED
4336 client_context.post_handshake_auth = True
4337 client_context.load_cert_chain(SIGNED_CERTFILE)
4338
4339 server = ThreadedEchoServer(context=server_context, chatty=False)
4340 with server:
4341 with client_context.wrap_socket(socket.socket(),
4342 server_hostname=hostname) as s:
4343 s.connect((HOST, server.port))
4344 s.write(b'HASCERT')
4345 self.assertEqual(s.recv(1024), b'TRUE\n')
4346 # PHA doesn't fail if there is already a cert
4347 s.write(b'PHA')
4348 self.assertEqual(s.recv(1024), b'OK\n')
4349 s.write(b'HASCERT')
4350 self.assertEqual(s.recv(1024), b'TRUE\n')
4351
4352 def test_pha_not_tls13(self):
4353 # TLS 1.2
4354 client_context, server_context, hostname = testing_context()
4355 server_context.verify_mode = ssl.CERT_REQUIRED
4356 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4357 client_context.post_handshake_auth = True
4358 client_context.load_cert_chain(SIGNED_CERTFILE)
4359
4360 server = ThreadedEchoServer(context=server_context, chatty=False)
4361 with server:
4362 with client_context.wrap_socket(socket.socket(),
4363 server_hostname=hostname) as s:
4364 s.connect((HOST, server.port))
4365 # PHA fails for TLS != 1.3
4366 s.write(b'PHA')
4367 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4368
4369
Thomas Woutersed03b412007-08-28 21:37:11 +00004370def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004371 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004372 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004373 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004374 'Mac': platform.mac_ver,
4375 'Windows': platform.win32_ver,
4376 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004377 for name, func in plats.items():
4378 plat = func()
4379 if plat and plat[0]:
4380 plat = '%s %r' % (name, plat)
4381 break
4382 else:
4383 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004384 print("test_ssl: testing with %r %r" %
4385 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4386 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004387 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004388 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4389 try:
4390 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4391 except AttributeError:
4392 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004393
Antoine Pitrou152efa22010-05-16 18:19:27 +00004394 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004395 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004396 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004397 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004398 BADCERT, BADKEY, EMPTYCERT]:
4399 if not os.path.exists(filename):
4400 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004401
Martin Panter3840b2a2016-03-27 01:53:46 +00004402 tests = [
4403 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004404 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes9fb051f2018-09-23 08:32:31 +02004405 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004406 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004407
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004408 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004409 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004410
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004411 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004412 try:
4413 support.run_unittest(*tests)
4414 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004415 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004416
4417if __name__ == "__main__":
4418 test_main()